[ 
https://issues.apache.org/jira/browse/BEAM-13608?focusedWorklogId=756997&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756997
 ]

ASF GitHub Bot logged work on BEAM-13608:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Apr/22 15:02
            Start Date: 14/Apr/22 15:02
    Worklog Time Spent: 10m 
      Work Description: rvballada commented on code in PR #17163:
URL: https://github.com/apache/beam/pull/17163#discussion_r850536477


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +715,136 @@ public Write withQueue(String queue) {
      * @param topic The JMS topic name.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withTopic(String topic) {
+    public Write<EventT> withTopic(String topic) {
       checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
     /** Define the username to connect to the JMS broker (authenticated). */
-    public Write withUsername(String username) {
+    public Write<EventT> withUsername(String username) {
       checkArgument(username != null, "username can not be null");
       return builder().setUsername(username).build();
     }
 
     /** Define the password to connect to the JMS broker (authenticated). */
-    public Write withPassword(String password) {
+    public Write<EventT> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
+    /**
+     * Specify the JMS topic destination name where to send messages to 
dynamically. The {@link
+     * JmsIO.Write} acts as a publisher on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String) 
and
+     *{@link JmsIO.Write#withTopic(String)}. The user has to specify a 
Serializable function
+     * that takes the Event Object as parameter, and returns the topîc name 
depending of the content
+     * of the event object.
+     *
+     * <p>For instance:
+     * <pre>{@code
+     * SerializableFunction<SomeEventObject, String> topicNameMapper =
+     *         (e ->
+     *             return the topic name ;
+     *
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+     *
+     * }</pre>
+     *
+     * @param topicNameMapper The function returning the dynamic topic name.
+     * @return The corresponding {@link JmsIO.Write}.
+     */
+    public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, 
String> topicNameMapper) {
+      checkArgument(topicNameMapper != null, "topicNameMapper can not be 
null");
+      return builder().setTopicNameMapper(topicNameMapper).build();
+    }
+
+    /**
+     * Map the Event object with a {@link javax.jms.Message}.
+     *
+     * <p>For instance:
+     *
+     * <pre>{@code
+     * SerializableMapper<VehicleEvent> valueMapper = (e, s) -> {
+     *
+     *       try {
+     *         TextMessage msg = s.createTextMessage();
+     *         msg.setText(Mapper.MAPPER.toJson(e));
+     *         return msg;
+     *       } catch (JMSException ex) {
+     *         throw new JmsIOException("Error!!", ex);
+     *       }
+     *     };
+     *
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withValueMapper(valueNapper)
+     *
+     * }</pre>
+     *
+     * @param valueMapper The function returning the {@link javax.jms.Message}
+     * @return The corresponding {@link JmsIO.Write}.
+     */
+    public Write<EventT> withValueMapper(SerializableMessageMapper<EventT> 
valueMapper) {
+      checkArgument(valueMapper != null, "valueMapper can not be null");
+      return builder().setValueMapper(valueMapper).build();
+    }
+
+    public Write<EventT> withCoder(Coder<EventT> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
     @Override
-    public PDone expand(PCollection<String> input) {
+    public WriteJmsResult<EventT> expand(PCollection<EventT> input) {
       checkArgument(getConnectionFactory() != null, "withConnectionFactory() 
is required");
+      checkArgument(getCoder() != null, "withCoder() is required");

Review Comment:
   Yes, you're right





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756997)
    Time Spent: 1.5h  (was: 1h 20m)

> Dynamic Topics management
> -------------------------
>
>                 Key: BEAM-13608
>                 URL: https://issues.apache.org/jira/browse/BEAM-13608
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-jms
>            Reporter: Vincent BALLADA
>            Assignee: Vincent BALLADA
>            Priority: P2
>              Labels: assigned:
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> JmsIO write function is able to publish messages to topics with static names:
> company/employee/id/1234567.
> Some AMQP/JMS broker provides the ability to publish to dynamic topics like:
> company/employee/id/\{employeeId}
> If we want to handle that with Apache Beam JmsIO, we must create a branch per 
> employeeId, which is not suitable for a company with thousand of employee, or 
> other similat use cases.
> The JmsIO write function should provide the ability to handle dynamic topics.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to