This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a0a2809324 [Enhancement](multi-catalog) support hms event 
deserialization for HDP/CDH Hive versions. (#17660)
a0a2809324 is described below

commit a0a2809324f079a5f16fc7fad55709cc946e25d4
Author: Xiangyu Wang <[email protected]>
AuthorDate: Mon Mar 13 09:47:28 2023 +0800

    [Enhancement](multi-catalog) support hms event deserialization for HDP/CDH 
Hive versions. (#17660)
    
    Some HDP/CDH Hive versions use gzip to compress the message body of hms 
NotificationEvent,
    so com.qihoo.finance.hms.event.MetastoreEventFactory can not transfer it 
rightly.
---
 .../datasource/hive/event/AddPartitionEvent.java   |   2 +-
 .../datasource/hive/event/AlterPartitionEvent.java |   2 +-
 .../datasource/hive/event/AlterTableEvent.java     |   2 +-
 .../datasource/hive/event/CreateTableEvent.java    |   3 +-
 .../datasource/hive/event/DropPartitionEvent.java  |   2 +-
 .../datasource/hive/event/DropTableEvent.java      |   2 +-
 .../hive/event/GzipJSONMessageDeserializer.java    | 172 +++++++++++++++++++++
 .../hive/event/MetastoreEventsProcessor.java       |  14 +-
 .../event/GzipJSONMessageDeserializerTest.java     |  63 ++++++++
 9 files changed, 252 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
index 004efa3d15..e5f8d1bb71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -49,7 +49,7 @@ public class AddPartitionEvent extends MetastoreTableEvent {
                 .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
             AddPartitionMessage addPartitionMessage =
-                    MetastoreEventsProcessor.getMessageDeserializer()
+                    
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
                             .getAddPartitionMessage(event.getMessage());
             hmsTbl = 
Preconditions.checkNotNull(addPartitionMessage.getTableObj());
             Iterable<Partition> addedPartitions = 
addPartitionMessage.getPartitionObjs();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
index b2ba44f842..5209edccab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -52,7 +52,7 @@ public class AlterPartitionEvent extends MetastoreTableEvent {
                 .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
             AlterPartitionMessage alterPartitionMessage =
-                    MetastoreEventsProcessor.getMessageDeserializer()
+                    
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
                             .getAlterPartitionMessage(event.getMessage());
             hmsTbl = 
Preconditions.checkNotNull(alterPartitionMessage.getTableObj());
             partitionBefore = 
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
index b14a1c6577..0dea5e8968 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -48,7 +48,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
                 .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
             JSONAlterTableMessage alterTableMessage =
-                    (JSONAlterTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer()
+                    (JSONAlterTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
                             .getAlterTableMessage(event.getMessage());
             tableAfter = 
Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
             tableBefore = 
Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
index e4e8e8bb4b..2d364c4c14 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -42,7 +42,8 @@ public class CreateTableEvent extends MetastoreTableEvent {
                 .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
             CreateTableMessage createTableMessage =
-                    
MetastoreEventsProcessor.getMessageDeserializer().getCreateTableMessage(event.getMessage());
+                    
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
+                            .getCreateTableMessage(event.getMessage());
             hmsTbl = 
Preconditions.checkNotNull(createTableMessage.getTableObj());
         } catch (Exception e) {
             throw new MetastoreNotificationException(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
index cd5ed5bfbd..1dce403f55 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -48,7 +48,7 @@ public class DropPartitionEvent extends MetastoreTableEvent {
                 .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
             DropPartitionMessage dropPartitionMessage =
-                    MetastoreEventsProcessor.getMessageDeserializer()
+                    
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
                             .getDropPartitionMessage(event.getMessage());
             hmsTbl = 
Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
             List<Map<String, String>> droppedPartitions = 
dropPartitionMessage.getPartitions();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index 2b84c1bb42..aa74c67512 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -42,7 +42,7 @@ public class DropTableEvent extends MetastoreTableEvent {
                 .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
             JSONDropTableMessage dropTableMessage =
-                    (JSONDropTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer()
+                    (JSONDropTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
                             .getDropTableMessage(event.getMessage());
             tableName = dropTableMessage.getTable();
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializer.java
new file mode 100644
index 0000000000..0606f4e44d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializer.java
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.zip.GZIPInputStream;
+
+public class GzipJSONMessageDeserializer extends JSONMessageDeserializer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GzipJSONMessageDeserializer.class.getName());
+
+    public GzipJSONMessageDeserializer() {
+    }
+
+    private static String deCompress(String messageBody) {
+        try {
+            byte[] decodedBytes = 
Base64.getDecoder().decode(messageBody.getBytes(StandardCharsets.UTF_8));
+            String body;
+            ByteArrayInputStream in = new ByteArrayInputStream(decodedBytes);
+            try {
+                GZIPInputStream is = new GZIPInputStream(in);
+                try {
+                    byte[] bytes = IOUtils.toByteArray(is);
+                    body = new String(bytes, StandardCharsets.UTF_8);
+                } finally {
+                    try {
+                        is.close();
+                    } catch (Throwable ignore) {
+                        LOG.warn("close GZIPInputStream failed", ignore);
+                    }
+                }
+            } finally {
+                try {
+                    in.close();
+                } catch (Throwable ignore) {
+                    LOG.warn("close ByteArrayInputStream failed", ignore);
+                }
+            }
+            return body;
+        } catch (Exception e) {
+            LOG.error("cannot decode the stream", e);
+            throw new RuntimeException("cannot decode the stream ", e);
+        }
+    }
+
+    public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) {
+        return super.getCreateDatabaseMessage(deCompress(messageBody));
+    }
+
+    public AlterDatabaseMessage getAlterDatabaseMessage(String messageBody) {
+        return super.getAlterDatabaseMessage(deCompress(messageBody));
+    }
+
+    public DropDatabaseMessage getDropDatabaseMessage(String messageBody) {
+        return super.getDropDatabaseMessage(deCompress(messageBody));
+    }
+
+    public CreateTableMessage getCreateTableMessage(String messageBody) {
+        return super.getCreateTableMessage(deCompress(messageBody));
+    }
+
+    public AlterTableMessage getAlterTableMessage(String messageBody) {
+        return super.getAlterTableMessage(deCompress(messageBody));
+    }
+
+    public DropTableMessage getDropTableMessage(String messageBody) {
+        return super.getDropTableMessage(deCompress(messageBody));
+    }
+
+    public AddPartitionMessage getAddPartitionMessage(String messageBody) {
+        return super.getAddPartitionMessage(deCompress(messageBody));
+    }
+
+    public AlterPartitionMessage getAlterPartitionMessage(String messageBody) {
+        return super.getAlterPartitionMessage(deCompress(messageBody));
+    }
+
+    public DropPartitionMessage getDropPartitionMessage(String messageBody) {
+        return super.getDropPartitionMessage(deCompress(messageBody));
+    }
+
+    public CreateFunctionMessage getCreateFunctionMessage(String messageBody) {
+        return super.getCreateFunctionMessage(deCompress(messageBody));
+    }
+
+    public DropFunctionMessage getDropFunctionMessage(String messageBody) {
+        return super.getDropFunctionMessage(deCompress(messageBody));
+    }
+
+    public InsertMessage getInsertMessage(String messageBody) {
+        return super.getInsertMessage(deCompress(messageBody));
+    }
+
+    public AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody) {
+        return super.getAddPrimaryKeyMessage(deCompress(messageBody));
+    }
+
+    public AddForeignKeyMessage getAddForeignKeyMessage(String messageBody) {
+        return super.getAddForeignKeyMessage(deCompress(messageBody));
+    }
+
+    public AddUniqueConstraintMessage getAddUniqueConstraintMessage(String 
messageBody) {
+        return super.getAddUniqueConstraintMessage(deCompress(messageBody));
+    }
+
+    public AddNotNullConstraintMessage getAddNotNullConstraintMessage(String 
messageBody) {
+        return super.getAddNotNullConstraintMessage(deCompress(messageBody));
+    }
+
+    public DropConstraintMessage getDropConstraintMessage(String messageBody) {
+        return super.getDropConstraintMessage(deCompress(messageBody));
+    }
+
+    public OpenTxnMessage getOpenTxnMessage(String messageBody) {
+        return super.getOpenTxnMessage(deCompress(messageBody));
+    }
+
+    public CommitTxnMessage getCommitTxnMessage(String messageBody) {
+        return super.getCommitTxnMessage(deCompress(messageBody));
+    }
+
+    public AbortTxnMessage getAbortTxnMessage(String messageBody) {
+        return super.getAbortTxnMessage(deCompress(messageBody));
+    }
+
+    public AllocWriteIdMessage getAllocWriteIdMessage(String messageBody) {
+        return super.getAllocWriteIdMessage(deCompress(messageBody));
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
index 1ff3bd98b2..ad89bb2fbe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -57,8 +57,11 @@ public class MetastoreEventsProcessor extends MasterDaemon {
             "hive.metastore.notifications.add.thrift.objects";
 
     // for deserializing from JSON strings from metastore event
-    private static final MessageDeserializer MESSAGE_DESERIALIZER = new 
JSONMessageDeserializer();
-
+    private static final MessageDeserializer JSON_MESSAGE_DESERIALIZER = new 
JSONMessageDeserializer();
+    // for deserializing from GZIP JSON strings from metastore event
+    // (some HDP Hive and CDH Hive versions use this format)
+    private static final MessageDeserializer GZIP_JSON_MESSAGE_DESERIALIZER = 
new GzipJSONMessageDeserializer();
+    private static final String GZIP_JSON_FORMAT_PREFIX = "gzip";
 
     // event factory which is used to get or create MetastoreEvents
     private final MetastoreEventFactory metastoreEventFactory;
@@ -145,7 +148,10 @@ public class MetastoreEventsProcessor extends MasterDaemon 
{
         }
     }
 
-    public static MessageDeserializer getMessageDeserializer() {
-        return MESSAGE_DESERIALIZER;
+    public static MessageDeserializer getMessageDeserializer(String 
messageFormat) {
+        if (messageFormat != null && 
messageFormat.startsWith(GZIP_JSON_FORMAT_PREFIX)) {
+            return GZIP_JSON_MESSAGE_DESERIALIZER;
+        }
+        return JSON_MESSAGE_DESERIALIZER;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializerTest.java
new file mode 100644
index 0000000000..7a3bc29d32
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializerTest.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GzipJSONMessageDeserializerTest {
+
+    @Test
+    public void testGetAlterPartitionMessage() {
+        // copy from an online HDP Hive cluster,
+        // eventType: ALTER_PARTITION, messageFormat: gzip(json-2.0)
+        String messageBody = 
"H4sIAAAAAAAAAO1WW2sbRxT+L/usjHa1ullQqOMkbUpSBWwKJVuW0e7IGntvmZlVcIwhT8UlfuhDiimUGk"
+                + 
"NKk5fQQkmD3dI/Y8nuv+iZmZWtXa+VKNSUgh7sHc05cy5z5jvf2TY4YUPCjI4hBoz2RadaTVqW3WwP0SANNlFvI/TRIzqIY"
+                + 
"xQR0Vky27ZRUYeoRx4wGnk0wQEcH9AhqbqfdlfXPr735coKWuneB0W/B6I+jVzsczeOAhoR2BW4FxAQyM3ETd0eGeAhjZnL"
+                + 
"iBcz3wXTLhdYUC6oByo+nRxa20rkwfvLny9/cvuWu7Z8897tiazb2/iMxxGItx3DcowOfLhgsHDm8OQYOxXHqOWP5zPQKnZ"
+                + 
"eReZ/nrnWqGsNaktrVrPVqNVNq2lLUWNaZMqd5qWdlt6BSPUiSyngAhYPM0GlUbmcbZacoCEpTQe+NFovTeP0+cF499uz17"
+                + 
"+M/vxuvP/m7/3fQG2nxMfUrfl4a243xwdnrw/H+z+Nfzi4wgEZkki4HhZkPWZzOzg5enZy/Obk7dPTo59Hu1+f/no8042Ad"
+                + 
"/VhLsDy6MdnVxj3scDuEAdpufEeXaeRmHFBo6d/SMtflZwd+H0OYLWb5gaNWBytB1ueV00BmVUFxceYkUEMv6v5x4v8XnVe"
+                + 
"NBRii9k6wgn2BgQNsB/HCZIe0aMA0RjFzENd5t2NklTciVmIRQ4M89jopqJgJION6Jei5oZVBpva/I5XCfPzKA9xokGnzFQ"
+                + 
"mX0vWHK6c4oA+gXuLI9TPwu2oxwCl25Fm2gXs6uOmKuxSKa61zDJn+jcr2/pxWNY7msW0w1pBphaZrJivluX9qYysWq4WpV"
+                + 
"nqTKwSYCQf0DPGe9+MXrwCMI/2js5RsTTzfpZUfRLMNhGHCvM4ZR7hiEP1Q4yiNHyAmViJAz6pV2WGOuwLZCrNbceBzGXPc"
+                + 
"ORvR/lLPSFXFfmvT0ngcyV9qJQjHF4oT/fnyYmiNXkLE1mUBoFkOCUHR0TthkRg2WDUrnLixWEI3ezcTEkzd7JOVQyp0M6v"
+                + 
"Narp3n9VPIXuf53xlFDF7KgmQVxzRBmzXBXLFLeUxhLE/1qtJA1dFUZy3c8lB3tHA18B1WMEnsdkvgtp9G74AtQzBOsGBPq"
+                + 
"C4YhTWdYAc3HLD9bU1CTbwfnEphR7qbdJBGTkwrzModUrpZoOJYtZtanvj07+Ojx9/nK8+/vZ4d7J26PsEl+8Ojt8OTvGrC"
+                + 
"OVtyOdL8p5RzaSfxaqI/OGbTUU40x150nvzA3Murtal8jUahVGDCDGTLc9zbOKZq2s89JmXRMAjOGUr7E0kqDtJnLuxwFX4"
+                + 
"z7cKPSXMFFTcLtu11v1RsV4zKggd304XDE2ydYX8ilzsKlqA8drZs02LduCCAxZPCr5FYb8mwQ4llwa9QtUZ0luySw4RtkE"
+                + 
"9R5D/byDUsnEb7Vbsyf+xXy/mO//s/m+KrH20QVOFtP+Ytp/32m/NTNYW17WTHLVRKDJVcQCB6v0iRYuNTT3AR3eoQHhU/d"
+                + 
"7cYMlLGVepqRp4ljuw7iw4I0Fbyx4Y8EbC974f/NG4zp5Y+cflz5IfxgZAAA=";
+        MessageDeserializer messageDeserializer = 
MetastoreEventsProcessor.getMessageDeserializer("gzip(json-2.0)");
+        Assert.assertTrue(messageDeserializer instanceof 
GzipJSONMessageDeserializer);
+
+        try {
+            AlterPartitionMessage alterPartitionMessage = 
messageDeserializer.getAlterPartitionMessage(messageBody);
+            Assert.assertTrue(alterPartitionMessage != null);
+            Assert.assertTrue(alterPartitionMessage.getEventType() == 
EventMessage.EventType.ALTER_PARTITION);
+            Assert.assertTrue(alterPartitionMessage.getTableObj() != null);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.assertTrue(false);
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to