This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fa5c5d4d62d31123c03290953af827ec4c94b79e Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jun 15 05:49:52 2021 +0800 Fix the unit tests for the websocket and run tests under websocket group (#10921) --- build/run_unit_group.sh | 2 +- .../pulsar/websocket/proxy/SimpleConsumerSocket.java | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 9c24774..00b3fb0 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -42,7 +42,7 @@ function broker_group_1() { } function broker_group_2() { - $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,other' + $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other' } function broker_client_api() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java index f6917cc..749bfdcd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java @@ -79,15 +79,19 @@ public class SimpleConsumerSocket { public synchronized void onMessage(String msg) throws JsonParseException, IOException { receivedMessages.incrementAndGet(); JsonObject message = new Gson().fromJson(msg, JsonObject.class); - String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString(); - consumerBuffer.add(messageId); - if (customMessageHandler != null) { - this.getRemote().sendString(customMessageHandler.handle(messageId, message)); + if (message.get(X_PULSAR_MESSAGE_ID) != null) { + String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString(); + consumerBuffer.add(messageId); + if (customMessageHandler != null) { + this.getRemote().sendString(customMessageHandler.handle(messageId, message)); + } else { + JsonObject ack = new JsonObject(); + ack.add("messageId", new JsonPrimitive(messageId)); + // Acking the proxy + this.getRemote().sendString(ack.toString()); + } } else { - JsonObject ack = new JsonObject(); - ack.add("messageId", new JsonPrimitive(messageId)); - // Acking the proxy - this.getRemote().sendString(ack.toString()); + consumerBuffer.add(message.toString()); } }