[ 
https://issues.apache.org/jira/browse/BEAM-8513?focusedWorklogId=337420&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337420
 ]

ASF GitHub Bot logged work on BEAM-8513:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/19 17:12
            Start Date: 01/Nov/19 17:12
    Worklog Time Spent: 10m 
      Work Description: jkff commented on pull request #9937: [BEAM-8513] Allow 
reads from exchange-bound queue without declaring the exchange
URL: https://github.com/apache/beam/pull/9937#discussion_r341664641
 
 

 ##########
 File path: 
sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
 ##########
 @@ -233,26 +255,73 @@ public Read withQueue(String queue) {
 
     /**
      * You can "force" the declaration of a queue on the RabbitMQ broker. 
Exchanges and queues are
-     * the high-level building blocks of AMQP. These must be "declared" before 
they can be used.
-     * Declaring either type of object simply ensures that one of that name 
exists, creating it if
-     * necessary.
+     * the high-level building blocks of AMQP. These must be "declared" 
(created) before they can be
+     * used. Declaring either type of object ensures that one of that name and 
of the specified
+     * properties exists, creating it if necessary.
      *
-     * @param queueDeclare If {@code true}, {@link RabbitMqIO} will declare 
the queue. If another
-     *     application declare the queue, it's not required.
+     * <p>NOTE: When declaring a queue or exchange that already exists, the 
properties specified in
+     * the declaration must match those of the existing queue or exchange. 
That is, if you declare a
+     * queue to be non-durable but a durable queue already exists with the 
same name, the
+     * declaration will fail. When declaring a queue, RabbitMqIO will declare 
it to be non-durable.
+     *
+     * @param queueDeclare If {@code true}, {@link RabbitMqIO} will declare a 
non-durable queue. If
+     *     another application created the queue, this is not required and 
should be set to {@code
+     *     false}
      */
     public Read withQueueDeclare(boolean queueDeclare) {
       return builder().setQueueDeclare(queueDeclare).build();
     }
 
     /**
-     * Instead of consuming messages on a specific queue, you can consume 
message from a given
-     * exchange. Then you specify the exchange name, type and optionally 
routing key where you want
-     * to consume messages.
+     * In AMQP, messages are published to an exchange and routed to queues 
based on the exchange
+     * type and a queue binding. Most exchange types utilize the routingKey to 
determine which
+     * queues to deliver messages to. It is incumbent upon the developer to 
understand the paradigm
+     * in place to determine whether to declare a queue, what the appropriate 
binding should be, and
+     * what routingKey will be in use.
+     *
+     * <p>This function should be used if the Beam pipeline will be 
responsible for declaring the
+     * exchange. As a result of calling this function, {@code exchangeDeclare} 
will be set to {@code
+     * true} and the resulting exchange will be non-durable and of the 
supplied type. If an exchange
+     * with the given name already exists but is durable or is of another 
type, exchange declaration
+     * will fail.
+     *
+     * <p>To use an exchange without declaring it, especially for cases when 
the exchange is shared
+     * with other applications or already exists, use {@link 
#withExchange(String, String)} instead.
+     *
+     * @see
+     *     
"https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html";
+     *     for a write-up on exchange types and routing semantics
      */
-    public Read withExchange(String name, String type, String routingKey) {
-      checkArgument(name != null, "name can not be null");
-      checkArgument(type != null, "type can not be null");
-      return 
builder().setExchange(name).setExchangeType(type).setRoutingKey(routingKey).build();
+    public Read withExchange(String name, String type, @Nullable String 
routingKey) {
 
 Review comment:
   I think it might make sense to rename this function and the other one 
respectively to `withDeclareExchange` and `withExistingExchange`. Though, keep 
the original functions for backwards compatibility (but mark them deprecated, 
make them simply delegate to the non-deprecated versions, and make their 
documentation a one-liner directing to those versions too)
   
   Same applies to the write transform.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 337420)
    Time Spent: 2h 50m  (was: 2h 40m)

> RabbitMqIO: Allow reads from exchange-bound queue without declaring the 
> exchange
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-8513
>                 URL: https://issues.apache.org/jira/browse/BEAM-8513
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-rabbitmq
>         Environment: testing with DirectRunner
>            Reporter: Nick Aldwin
>            Assignee: Jean-Baptiste Onofré
>            Priority: Critical
>             Fix For: 2.18.0
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The RabbitMqIO always declares an exchange if it is configured to read from 
> it.  This is problematic with pre-existing exchanges (a relatively common 
> pattern), as there's no provided configuration for the exchange beyond 
> exchange type.  (We stumbled on this because RabbitMqIO always declares a 
> non-durable exchange, which fails if the exchange already exists as a durable 
> exchange)
>  
> A solution to this would be to allow RabbitMqIO to read from an exchange 
> without declaring it.  This pattern is already available for queues via the 
> `queueDeclare` flag.  I propose an `exchangeDeclare` flag which preserves 
> existing behavior except for skipping the call to `exchangeDeclare` before 
> binding the queue to the exchange.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to