valdar commented on a change in pull request #428:
URL:
https://github.com/apache/camel-kafka-connector/pull/428#discussion_r484557883
##########
File path:
core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
}
}
+ private long remaining(long startPollEpochMilli, long maxPollDuration) {
+ return maxPollDuration - (Instant.now().toEpochMilli() -
startPollEpochMilli);
+ }
+
+
@Override
public synchronized List<SourceRecord> poll() {
- long startPollEpochMilli = Instant.now().toEpochMilli();
+ final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+ long remaining = remaining(startPollEpochMilli, maxPollDuration);
long collectedRecords = 0L;
- List<SourceRecord> records = new ArrayList<>();
- while (collectedRecords < maxBatchPollSize &&
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
- Exchange exchange = consumer.receiveNoWait();
+ List<SourceRecord> records = null;
+ while (collectedRecords < maxBatchPollSize && remaining > 0) {
+ Exchange exchange = consumer.receive(remaining);
+ if (exchange == null) {
Review comment:
what is the point of this check? if the exchange is null, since you
receive waiting for the whole remaining time it means you have finished the
time so there is no point in updating it earlier and continue.
I would rather just check if exchange is not `null` and in that case do all
the work except updating remaining time, i.e. something like:
```java
while(...) {
Exchange exchange = consumer.receive(remaining);
if(exchange != null) {
//whole code here
}
remaining = remaining(startPollEpochMilli, maxPollDuration);
}
...
```
##########
File path:
core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
}
}
+ private long remaining(long startPollEpochMilli, long maxPollDuration) {
+ return maxPollDuration - (Instant.now().toEpochMilli() -
startPollEpochMilli);
+ }
+
+
@Override
public synchronized List<SourceRecord> poll() {
- long startPollEpochMilli = Instant.now().toEpochMilli();
+ final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+ long remaining = remaining(startPollEpochMilli, maxPollDuration);
long collectedRecords = 0L;
- List<SourceRecord> records = new ArrayList<>();
- while (collectedRecords < maxBatchPollSize &&
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
- Exchange exchange = consumer.receiveNoWait();
+ List<SourceRecord> records = null;
+ while (collectedRecords < maxBatchPollSize && remaining > 0) {
+ Exchange exchange = consumer.receive(remaining);
+ if (exchange == null) {
Review comment:
I might have completely overlooked something here not understanding your
motivations, feel free to disagree and point me wrong. After all I was the
author of the buggy/not performant previous version of this section of the
codebase :P
##########
File path:
core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
}
}
+ private long remaining(long startPollEpochMilli, long maxPollDuration) {
+ return maxPollDuration - (Instant.now().toEpochMilli() -
startPollEpochMilli);
+ }
+
+
@Override
public synchronized List<SourceRecord> poll() {
- long startPollEpochMilli = Instant.now().toEpochMilli();
+ final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+ long remaining = remaining(startPollEpochMilli, maxPollDuration);
long collectedRecords = 0L;
- List<SourceRecord> records = new ArrayList<>();
- while (collectedRecords < maxBatchPollSize &&
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
- Exchange exchange = consumer.receiveNoWait();
+ List<SourceRecord> records = null;
+ while (collectedRecords < maxBatchPollSize && remaining > 0) {
+ Exchange exchange = consumer.receive(remaining);
+ if (exchange == null) {
+ remaining = remaining(startPollEpochMilli, maxPollDuration);
+ continue;
+ }
+
+ if (records == null) {
Review comment:
I would rather initialize `records` with and empty list check at the end
if it is empty to return `null` for readability, than getting (little?) gain
in performance...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]