This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit fd7eb2cb713ab4d265c9ff907105edd186f7f586 Author: Aaron Ai <[email protected]> AuthorDate: Sun Jul 3 17:02:57 2022 +0800 Java: fix compile issue on JDK17 --- .github/workflows/java_build.yml | 2 +- .../rocketmq/client/java/impl/ClientImpl.java | 2 +- .../client/java/impl/consumer/ConsumerImpl.java | 12 +-- .../java/impl/consumer/ProcessQueueImpl.java | 81 ++++++++++++---- .../java/impl/consumer/PushConsumerSettings.java | 3 +- .../java/impl/consumer/SimpleConsumerSettings.java | 3 +- .../java/impl/producer/ProducerSettings.java | 3 +- .../java/impl/producer/TransactionImplTest.java | 102 --------------------- java/style/spotbugs-suppressions.xml | 41 ++++++++- 9 files changed, 112 insertions(+), 137 deletions(-) diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml index 8bf1d0e..eda1e09 100644 --- a/.github/workflows/java_build.yml +++ b/.github/workflows/java_build.yml @@ -2,7 +2,7 @@ name: Java Build on: [push, pull_request] jobs: java_build: - name: "check (${{ matrix.os }} JDK-${{ matrix.jdk }})" + name: "${{ matrix.os }} JDK-${{ matrix.jdk }}" runs-on: ${{ matrix.os }} strategy: matrix: diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index b16221f..21aa92e 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -435,7 +435,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId, old, topicRouteDataResult); } - future0.set(null); + future0.setFuture(Futures.immediateVoidFuture()); onTopicRouteDataResultUpdate0(topic, topicRouteDataResult); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java index 18cee43..539b876 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java @@ -146,12 +146,8 @@ abstract class ConsumerImpl extends ClientImpl { final Metadata metadata = sign(); future = clientManager.ackMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); } catch (Throwable t) { - final SettableFuture<AckMessageResponse> future0 = SettableFuture.create(); - future0.setException(t); - future = future0; + return Futures.immediateFailedFuture(t); } - final String topic = messageView.getTopic(); - final MessageId messageId = messageView.getMessageId(); Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { @Override public void onSuccess(AckMessageResponse response) { @@ -160,10 +156,6 @@ abstract class ConsumerImpl extends ClientImpl { final Duration duration = stopwatch.elapsed(); MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR; - if (!Code.OK.equals(code)) { - LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}, " + - "clientId={}", code, status.getMessage(), topic, messageId, clientId); - } doAfter(MessageHookPoints.ACK, messageCommons, duration, messageHookPointsStatus); } @@ -171,8 +163,6 @@ abstract class ConsumerImpl extends ClientImpl { public void onFailure(Throwable t) { final Duration duration = stopwatch.elapsed(); doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR); - LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}", - topic, messageId, clientId, t); } }, MoreExecutors.directExecutor()); return future; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java index dc648f6..35c3b50 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java @@ -376,12 +376,42 @@ class ProcessQueueImpl implements ProcessQueue { statsConsumptionResult(consumeResult); eraseMessage(messageView); if (ConsumeResult.SUCCESS.equals(consumeResult)) { - consumer.ackMessage(messageView); + ackMessage(messageView); return; } nackMessage(messageView); } + private void ackMessage(MessageViewImpl messageView) { + final String clientId = consumer.getClientId(); + final String consumerGroup = consumer.getConsumerGroup(); + final MessageId messageId = messageView.getMessageId(); + final Endpoints endpoints = messageView.getEndpoints(); + final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView); + Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { + @Override + public void onSuccess(AckMessageResponse response) { + final Status status = response.getStatus(); + final Code code = status.getCode(); + if (Code.OK.equals(code)) { + LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, " + + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints); + return; + } + LOGGER.error("Failed to ack message, clientId={}, consumerGroup={}, messageId={}, mq={}, " + + "endpoints={}, code={}, status message={}", clientId, consumerGroup, messageId, mq, + endpoints, code, status.getMessage()); + } + + @Override + public void onFailure(Throwable t) { + LOGGER.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, " + + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq, + endpoints, t); + } + }, MoreExecutors.directExecutor()); + } + private void nackMessage(MessageViewImpl messageView) { final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(messageView.getDeliveryAttempt()); consumer.changeInvisibleDuration(messageView, duration); @@ -452,6 +482,9 @@ class ProcessQueueImpl implements ProcessQueue { final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future = consumer.forwardMessageToDeadLetterQueue(messageView); final String clientId = consumer.getClientId(); + final String consumerGroup = consumer.getConsumerGroup(); + final MessageId messageId = messageView.getMessageId(); + final Endpoints endpoints = messageView.getEndpoints(); Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() { @Override public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) { @@ -460,26 +493,31 @@ class ProcessQueueImpl implements ProcessQueue { // Log failure and retry later. if (!Code.OK.equals(code)) { LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," + - " clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}", - clientId, messageView.getMessageId(), attempt, mq, code, status.getMessage()); + " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, code={}, " + + "status message={}", clientId, consumerGroup, messageId, attempt, mq, endpoints, code, + status.getMessage()); forwardToDeadLetterQueue(messageView, 1 + attempt, future0); return; } // Log retries. if (1 < attempt) { - LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, attempt={}, " + - "messageId={}, mq={}", clientId, attempt, messageView.getMessageId(), mq); + LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, " + + "attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt, + messageId, mq, endpoints); + } else { + LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, " + + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq, endpoints); } // Set result if message is forwarded successfully. - future0.set(null); + future0.setFuture(Futures.immediateVoidFuture()); } @Override public void onFailure(Throwable t) { // Log failure and retry later. LOGGER.error("Exception raised while forward message to DLQ, would attempt to re-forward later, " + - "clientId={}, attempt={}, messageId={}, mq={}", clientId, attempt, - messageView.getMessageId(), mq, t); + "clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}", clientId, consumerGroup, + attempt, messageId, mq, t); forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0); } }, MoreExecutors.directExecutor()); @@ -518,8 +556,10 @@ class ProcessQueueImpl implements ProcessQueue { private void ackFifoMessage(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) { - final Endpoints endpoints = messageView.getEndpoints(); final String clientId = consumer.getClientId(); + final String consumerGroup = consumer.getConsumerGroup(); + final MessageId messageId = messageView.getMessageId(); + final Endpoints endpoints = messageView.getEndpoints(); final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView); Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { @Override @@ -528,27 +568,32 @@ class ProcessQueueImpl implements ProcessQueue { final Code code = status.getCode(); // Log failure and retry later. if (!Code.OK.equals(code)) { - LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}," + - " messageId={}, mq={}, code={}, endpoints={}, status message=[{}]", - clientId, attempt, messageView.getMessageId(), mq, code, endpoints, status.getMessage()); + LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, " + + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status " + + "message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, + endpoints, status.getMessage()); ackFifoMessageLater(messageView, 1 + attempt, future0); return; } // Log retries. if (1 < attempt) { - LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}," + - " endpoints={}", clientId, attempt, messageView.getMessageId(), mq, endpoints); + LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, " + + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt, + messageId, mq, endpoints); + } else { + LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, " + + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints); } // Set result if FIFO message is acknowledged successfully. - future0.set(null); + future0.setFuture(Futures.immediateVoidFuture()); } @Override public void onFailure(Throwable t) { // Log failure and retry later. - LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later," + - " attempt={}, messageId={}, mq={}, endpoints={}", - clientId, attempt, messageView.getMessageId(), mq, endpoints, t); + LOGGER.error("Exception raised while acknowledging fifo message, clientId={}, consumerGroup={}, " + + "would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}", clientId, + consumerGroup, attempt, messageId, mq, endpoints, t); ackFifoMessageLater(messageView, 1 + attempt, future0); } }, MoreExecutors.directExecutor()); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java index 4218424..8879389 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java @@ -23,6 +23,7 @@ import apache.rocketmq.v2.Settings; import apache.rocketmq.v2.Subscription; import apache.rocketmq.v2.SubscriptionEntry; import com.google.common.base.MoreObjects; +import com.google.common.util.concurrent.Futures; import com.google.protobuf.util.Durations; import java.time.Duration; import java.util.ArrayList; @@ -122,7 +123,7 @@ public class PushConsumerSettings extends ClientSettings { default: throw new IllegalArgumentException("Unrecognized backoff policy strategy."); } - this.arrivedFuture.set(null); + this.arrivedFuture.setFuture(Futures.immediateVoidFuture()); } @Override diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java index d357cdf..06a1d7f 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java @@ -22,6 +22,7 @@ import apache.rocketmq.v2.Settings; import apache.rocketmq.v2.Subscription; import apache.rocketmq.v2.SubscriptionEntry; import com.google.common.base.MoreObjects; +import com.google.common.util.concurrent.Futures; import com.google.protobuf.util.Durations; import java.time.Duration; import java.util.ArrayList; @@ -92,7 +93,7 @@ public class SimpleConsumerSettings extends ClientSettings { + "client type={}", clientId, pubSubCase, clientType); return; } - this.arrivedFuture.set(null); + this.arrivedFuture.setFuture(Futures.immediateVoidFuture()); } @Override diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java index 60a8100..9f6f261 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl.producer; import apache.rocketmq.v2.Publishing; import apache.rocketmq.v2.Settings; import com.google.common.base.MoreObjects; +import com.google.common.util.concurrent.Futures; import com.google.protobuf.util.Durations; import java.time.Duration; import java.util.Set; @@ -94,7 +95,7 @@ public class ProducerSettings extends ClientSettings { final Publishing publishing = settings.getPublishing(); this.compressBodyThresholdBytes = publishing.getCompressBodyThreshold(); this.maxBodySizeBytes = publishing.getMaxBodySize(); - this.arrivedFuture.set(null); + this.arrivedFuture.setFuture(Futures.immediateVoidFuture()); } @Override diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java index 3519385..e7670ed 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java @@ -17,26 +17,10 @@ package org.apache.rocketmq.client.java.impl.producer; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doNothing; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.concurrent.ExecutionException; import org.apache.rocketmq.client.apis.ClientException; -import org.apache.rocketmq.client.apis.message.Message; -import org.apache.rocketmq.client.apis.message.MessageId; -import org.apache.rocketmq.client.apis.producer.TransactionResolution; -import org.apache.rocketmq.client.java.message.MessageCommon; -import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -45,42 +29,6 @@ public class TransactionImplTest extends TestBase { @Mock ProducerImpl producer; - @Test - public void testTryAddMessage() throws IOException, NoSuchFieldException, IllegalAccessException { - final TransactionImpl transaction = new TransactionImpl(producer); - final Message message0 = fakeMessage(FAKE_TOPIC_0); - - final Class<? extends ProducerImpl> clazz = producer.getClass(); - final Field field = clazz.getDeclaredField("producerSettings"); - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - field.set(producer, fakeProducerSettings()); - - transaction.tryAddMessage(message0); - // Expect no exception thrown. - } - - @Test(expected = IllegalArgumentException.class) - public void testTryAddMultipleMessages() throws IOException, NoSuchFieldException, IllegalAccessException { - final TransactionImpl transaction = new TransactionImpl(producer); - final Message message0 = fakeMessage(FAKE_TOPIC_0); - final Message message1 = fakeMessage(FAKE_TOPIC_0); - - final Class<? extends ProducerImpl> clazz = producer.getClass(); - final Field field = clazz.getDeclaredField("producerSettings"); - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - field.set(producer, fakeProducerSettings()); - - transaction.tryAddMessage(message0); - transaction.tryAddMessage(message1); - // Expect no exception thrown. - } - @Test(expected = IllegalStateException.class) public void testCommitWithNoReceipts() throws ClientException { final TransactionImpl transaction = new TransactionImpl(producer); @@ -92,54 +40,4 @@ public class TransactionImplTest extends TestBase { final TransactionImpl transaction = new TransactionImpl(producer); transaction.rollback(); } - - @Test - public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException, - NoSuchFieldException, IllegalAccessException { - final TransactionImpl transaction = new TransactionImpl(producer); - final Message message0 = fakeMessage(FAKE_TOPIC_0); - - final Class<? extends ProducerImpl> clazz = producer.getClass(); - final Field field = clazz.getDeclaredField("producerSettings"); - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - field.set(producer, fakeProducerSettings()); - - final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0); - final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0()); - transaction.tryAddReceipt(publishingMessage, receipt); - ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = - ArgumentCaptor.forClass(TransactionResolution.class); - doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), - any(MessageId.class), anyString(), resolutionArgumentCaptor.capture()); - transaction.commit(); - assertEquals(TransactionResolution.COMMIT, resolutionArgumentCaptor.getValue()); - } - - @Test - public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException, - NoSuchFieldException, IllegalAccessException { - final TransactionImpl transaction = new TransactionImpl(producer); - final Message message0 = fakeMessage(FAKE_TOPIC_0); - - final Class<? extends ProducerImpl> clazz = producer.getClass(); - final Field field = clazz.getDeclaredField("producerSettings"); - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - field.set(producer, fakeProducerSettings()); - - final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0); - final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0()); - transaction.tryAddReceipt(publishingMessage, receipt); - ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = - ArgumentCaptor.forClass(TransactionResolution.class); - doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), - any(MessageId.class), anyString(), resolutionArgumentCaptor.capture()); - transaction.rollback(); - assertEquals(TransactionResolution.ROLLBACK, resolutionArgumentCaptor.getValue()); - } } \ No newline at end of file diff --git a/java/style/spotbugs-suppressions.xml b/java/style/spotbugs-suppressions.xml index ca8620f..eb0bb8d 100644 --- a/java/style/spotbugs-suppressions.xml +++ b/java/style/spotbugs-suppressions.xml @@ -6,7 +6,46 @@ </Match> <Match> - <Package name="~org\.apache\.rocketmq\.client.*"/> + <Package name="~org\.apache\.rocketmq\.client\.java.*"/> <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/> </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.ClientImpl$2" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl$3" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl$4" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" /> + <Method name="applySettingsCommand" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" /> + <Method name="applySettingsCommand" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerSettings" /> + <Method name="applySettingsCommand" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> + + <Match> + <Class name="org.apache.rocketmq.client.java.impl.producer.ProducerSettings" /> + <Method name="applySettingsCommand" /> + <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> + </Match> </FindBugsFilter>
