[
https://issues.apache.org/jira/browse/BEAM-4086?focusedWorklogId=94300&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94300
]
ASF GitHub Bot logged work on BEAM-4086:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/18 21:04
Start Date: 23/Apr/18 21:04
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5185: [BEAM-4086]: KafkaIO
tests: Avoid busy loop in MockConsumer.poll(), reduce flakes.
URL: https://github.com/apache/beam/pull/5185
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 5819a671275..7f9959a6579 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -30,6 +30,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
@@ -248,14 +249,23 @@ public void assign(final Collection<TopicPartition>
assigned) {
@Override
public void run() {
// add all the records with offset >= current partition position.
+ int recordsAdded = 0;
for (TopicPartition tp : assignedPartitions.get()) {
long curPos = consumer.position(tp);
for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
if (r.offset() >= curPos) {
consumer.addRecord(r);
+ recordsAdded++;
}
}
}
+ if (recordsAdded == 0) {
+ // MockConsumer.poll(timeout) does not actually wait even when there
aren't any records.
+ // Add a small wait here in order to avoid busy looping in the
reader.
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ //TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper()
occasionally hangs
+ // without this wait. Need to look into it.
+ }
consumer.schedulePollTask(this);
}
};
@@ -605,10 +615,12 @@ public Instant getWatermark(PartitionContext ctx) {
@Test
public void testUnboundedSourceWithoutBoundedWrapper() {
+ // This is same as testUnboundedSource() without the BoundedSource wrapper.
// Most of the tests in this file set 'maxNumRecords' on the source, which
wraps
// the unbounded source in a bounded source. As a result, the test
pipeline run as
// bounded/batch pipelines under direct-runner.
- // This is same as testUnboundedSource() without the BoundedSource wrapper.
+ // This tests runs without such a wrapper over unbounded wrapper, and
depends on watermark
+ // progressing to infinity to end the test (see
TimestampPolicyWithEndOfSource above).
final int numElements = 1000;
final int numPartitions = 10;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 94300)
Time Spent: 40m (was: 0.5h)
> KafkaIOTest is flaky
> --------------------
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.5.0
> Reporter: Ismaël Mejía
> Assignee: Raghu Angadi
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Noticed this while trying to do a simple change in KafkaIO this morning and
> corroborated with other contributors. If you run `./gradlew -p
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the
> times. However it passes ok with Maven.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)