[
https://issues.apache.org/jira/browse/BEAM-3454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326372#comment-16326372
]
ASF GitHub Bot commented on BEAM-3454:
--------------------------------------
iemejia closed pull request #4415: [BEAM-3454] Use multiple stopping conditions
on JmsIO, AmqpIO, MqttIO.
URL: https://github.com/apache/beam/pull/4415
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
index d2e059b0225..4848d451b8c 100644
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
@@ -160,19 +160,14 @@ public void populateDisplayData(DisplayData.Builder
builder) {
@Override
public PCollection<Message> expand(PBegin input) {
checkArgument(addresses() != null, "withAddresses() is required");
- checkArgument(
- maxReadTime() == null || maxNumRecords() == Long.MAX_VALUE,
- "withMaxNumRecords() and withMaxReadTime() are exclusive");
org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded =
org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this));
PTransform<PBegin, PCollection<Message>> transform = unbounded;
- if (maxNumRecords() != Long.MAX_VALUE) {
- transform = unbounded.withMaxNumRecords(maxNumRecords());
- } else if (maxReadTime() != null) {
- transform = unbounded.withMaxReadTime(maxReadTime());
+ if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
+ transform =
unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
}
return input.getPipeline().apply(transform);
diff --git
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index b3a9c8b6f4c..a9bf10c685d 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -287,10 +287,9 @@ public Read withMaxReadTime(Duration maxReadTime) {
PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;
- if (getMaxNumRecords() != Long.MAX_VALUE) {
- transform = unbounded.withMaxNumRecords(getMaxNumRecords());
- } else if (getMaxReadTime() != null) {
- transform = unbounded.withMaxReadTime(getMaxReadTime());
+ if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+ transform = unbounded.withMaxReadTime(getMaxReadTime())
+ .withMaxNumRecords(getMaxNumRecords());
}
return input.getPipeline().apply(transform);
diff --git
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index ef9c9d293d9..5d204b6a3ef 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -260,19 +260,13 @@ public Read withMaxReadTime(Duration maxReadTime) {
@Override
public PCollection<byte[]> expand(PBegin input) {
- checkArgument(
- maxReadTime() == null || maxNumRecords() == Long.MAX_VALUE,
- "withMaxNumRecords() and withMaxReadTime() are exclusive");
-
org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this));
PTransform<PBegin, PCollection<byte[]>> transform = unbounded;
- if (maxNumRecords() != Long.MAX_VALUE) {
- transform = unbounded.withMaxNumRecords(maxNumRecords());
- } else if (maxReadTime() != null) {
- transform = unbounded.withMaxReadTime(maxReadTime());
+ if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
+ transform =
unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
}
return input.getPipeline().apply(transform);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Make Unbounded to Bounded conversions on IOs able to trigger on maxNumRecords
> and maxReadTime combined
> ------------------------------------------------------------------------------------------------------
>
> Key: BEAM-3454
> URL: https://issues.apache.org/jira/browse/BEAM-3454
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-extensions
> Reporter: Ismaël Mejía
> Assignee: Ryan Skraba
> Priority: Minor
> Fix For: 2.3.0
>
>
> Some Beam IOs expose the ability to turn an unbounded source into a bounded
> source. These IOs have two methods withMaxNumRecords and withMaxReadTime.
> Both methods are exclusive at runtime but they shouldn't be because
> BoundedReadFromUnboundedSource supports to combine the options so the first
> condition reached can stop the source if the user provides both..
> There is already a ticket for KafkaIO and a fix for KinesisIO on the works,
> this ticket is to add this to the other Sources (e.g. MqttIO, JmsIO, etc).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)