[
https://issues.apache.org/jira/browse/BEAM-13608?focusedWorklogId=756997&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756997
]
ASF GitHub Bot logged work on BEAM-13608:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Apr/22 15:02
Start Date: 14/Apr/22 15:02
Worklog Time Spent: 10m
Work Description: rvballada commented on code in PR #17163:
URL: https://github.com/apache/beam/pull/17163#discussion_r850536477
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +715,136 @@ public Write withQueue(String queue) {
* @param topic The JMS topic name.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withTopic(String topic) {
+ public Write<EventT> withTopic(String topic) {
checkArgument(topic != null, "topic can not be null");
return builder().setTopic(topic).build();
}
/** Define the username to connect to the JMS broker (authenticated). */
- public Write withUsername(String username) {
+ public Write<EventT> withUsername(String username) {
checkArgument(username != null, "username can not be null");
return builder().setUsername(username).build();
}
/** Define the password to connect to the JMS broker (authenticated). */
- public Write withPassword(String password) {
+ public Write<EventT> withPassword(String password) {
checkArgument(password != null, "password can not be null");
return builder().setPassword(password).build();
}
+ /**
+ * Specify the JMS topic destination name where to send messages to
dynamically. The {@link
+ * JmsIO.Write} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)
and
+ *{@link JmsIO.Write#withTopic(String)}. The user has to specify a
Serializable function
+ * that takes the Event Object as parameter, and returns the topîc name
depending of the content
+ * of the event object.
+ *
+ * <p>For instance:
+ * <pre>{@code
+ * SerializableFunction<SomeEventObject, String> topicNameMapper =
+ * (e ->
+ * return the topic name ;
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+ *
+ * }</pre>
+ *
+ * @param topicNameMapper The function returning the dynamic topic name.
+ * @return The corresponding {@link JmsIO.Write}.
+ */
+ public Write<EventT> withTopicNameMapper(SerializableFunction<EventT,
String> topicNameMapper) {
+ checkArgument(topicNameMapper != null, "topicNameMapper can not be
null");
+ return builder().setTopicNameMapper(topicNameMapper).build();
+ }
+
+ /**
+ * Map the Event object with a {@link javax.jms.Message}.
+ *
+ * <p>For instance:
+ *
+ * <pre>{@code
+ * SerializableMapper<VehicleEvent> valueMapper = (e, s) -> {
+ *
+ * try {
+ * TextMessage msg = s.createTextMessage();
+ * msg.setText(Mapper.MAPPER.toJson(e));
+ * return msg;
+ * } catch (JMSException ex) {
+ * throw new JmsIOException("Error!!", ex);
+ * }
+ * };
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withValueMapper(valueNapper)
+ *
+ * }</pre>
+ *
+ * @param valueMapper The function returning the {@link javax.jms.Message}
+ * @return The corresponding {@link JmsIO.Write}.
+ */
+ public Write<EventT> withValueMapper(SerializableMessageMapper<EventT>
valueMapper) {
+ checkArgument(valueMapper != null, "valueMapper can not be null");
+ return builder().setValueMapper(valueMapper).build();
+ }
+
+ public Write<EventT> withCoder(Coder<EventT> coder) {
+ checkArgument(coder != null, "coder can not be null");
+ return builder().setCoder(coder).build();
+ }
+
@Override
- public PDone expand(PCollection<String> input) {
+ public WriteJmsResult<EventT> expand(PCollection<EventT> input) {
checkArgument(getConnectionFactory() != null, "withConnectionFactory()
is required");
+ checkArgument(getCoder() != null, "withCoder() is required");
Review Comment:
Yes, you're right
Issue Time Tracking
-------------------
Worklog Id: (was: 756997)
Time Spent: 1.5h (was: 1h 20m)
> Dynamic Topics management
> -------------------------
>
> Key: BEAM-13608
> URL: https://issues.apache.org/jira/browse/BEAM-13608
> Project: Beam
> Issue Type: Improvement
> Components: io-java-jms
> Reporter: Vincent BALLADA
> Assignee: Vincent BALLADA
> Priority: P2
> Labels: assigned:
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> JmsIO write function is able to publish messages to topics with static names:
> company/employee/id/1234567.
> Some AMQP/JMS broker provides the ability to publish to dynamic topics like:
> company/employee/id/\{employeeId}
> If we want to handle that with Apache Beam JmsIO, we must create a branch per
> employeeId, which is not suitable for a company with thousand of employee, or
> other similat use cases.
> The JmsIO write function should provide the ability to handle dynamic topics.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)