syhily commented on code in PR #95:
URL:
https://github.com/apache/flink-connector-pulsar/pull/95#discussion_r1739589754
##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java:
##########
@@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv
messageIdImpl) {
if (include) {
return messageIdImpl;
} else {
+ // if the message is batched, should return next single message in
current batch.
+ if (messageIdImpl.getBatchIndex() >= 0
+ && messageIdImpl.getBatchSize() > 0
+ && messageIdImpl.getBatchIndex() !=
messageIdImpl.getBatchSize() - 1) {
+ return new BatchMessageIdImpl(
+ messageIdImpl.getLedgerId(),
+ messageIdImpl.getEntryId(),
+ messageIdImpl.getPartitionIndex(),
+ messageIdImpl.getBatchIndex() + 1,
+ messageIdImpl.getBatchSize(),
+ messageIdImpl.getAckSet());
Review Comment:
1. The receive queue setting
https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java#L282
has been exposed to the use with default value `1000`
2. IIUC, the support for the batch `AckSet` is achieved locally by the
pulsar-client after all the batch message has been acked. (BTW, this shouldn't
be touched by the connector user and developer, which should be promised by the
pulsar client developer.)
3. The recover is queried from the checkpoint saved `MessageId`. Which the
AckSet is controlled internally by the client I think.
##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##########
@@ -196,7 +197,14 @@ public void
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
MessageId latestConsumedId = registeredSplit.getLatestConsumedId();
if (latestConsumedId != null) {
- LOG.info("Reset subscription position by the checkpoint {}",
latestConsumedId);
+ if (latestConsumedId instanceof BatchMessageIdImpl) {
Review Comment:
I can see that all the message implementation implement the `MessageIdAdv`
interface. Which contains all the required information for the client. I think
it's more better to use MessageIdAdv instead of the `MessageId` here in the
whole connector.
```
/**
* The {@link MessageId} interface provided for advanced users.
* <p>
* All built-in MessageId implementations should be able to be cast to
MessageIdAdv.
* </p>
*/
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]