[
https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=289231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289231
]
ASF GitHub Bot logged work on BEAM-7738:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Aug/19 22:21
Start Date: 05/Aug/19 22:21
Worklog Time Spent: 10m
Work Description: chadrik commented on pull request #9268: [BEAM-7738]
Add external transform support to PubSubIO
URL: https://github.com/apache/beam/pull/9268#discussion_r310815296
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -674,6 +680,85 @@ public String toString() {
abstract Builder<T> setClock(@Nullable Clock clock);
abstract Read<T> build();
+
+ @Override
+ public PTransform<PBegin, PCollection<T>>
buildExternal(External.Configuration config) {
+ if (config.topic != null) {
+ StaticValueProvider<String> topic =
StaticValueProvider.of(utf8String(config.topic));
+ setTopicProvider(NestedValueProvider.of(topic, new
TopicTranslator()));
+ }
+ if (config.subscription != null) {
+ StaticValueProvider<String> subscription =
+ StaticValueProvider.of(utf8String(config.subscription));
+ setSubscriptionProvider(
+ NestedValueProvider.of(subscription, new
SubscriptionTranslator()));
+ }
+ if (config.idAttribute != null) {
+ String idAttribute = utf8String(config.idAttribute);
+ setIdAttribute(idAttribute);
+ }
+ if (config.timestampAttribute != null) {
+ String timestampAttribute = utf8String(config.timestampAttribute);
+ setTimestampAttribute(timestampAttribute);
+ }
+ setNeedsAttributes(config.needsAttributes);
+ setPubsubClientFactory(FACTORY);
+ if (config.needsAttributes) {
+ SimpleFunction<PubsubMessage, T> parseFn =
+ (SimpleFunction<PubsubMessage, T>) new IdentityMessageFn();
+ setParseFn(parseFn);
+ // FIXME: call setCoder(). need to use PubsubMessage proto to be
compatible with python
Review comment:
In the case that an external sdk has requested `needsAttributes`, this
transform needs to produce `PubsubMessage` instances rather than byte arrays.
There's a wrinkle here: python deserializes `PubsubMessage` using protobufs,
whereas Java encodes with `PubsubMessageWithAttributesCoder`.
I believe we want to use protobufs from Java. Can I get confirmation of
that, please?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 289231)
Time Spent: 20m (was: 10m)
> Support PubSubIO to be configured externally for use with other SDKs
> --------------------------------------------------------------------
>
> Key: BEAM-7738
> URL: https://issues.apache.org/jira/browse/BEAM-7738
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp, runner-flink, sdk-py-core
> Reporter: Chad Dombrova
> Assignee: Chad Dombrova
> Priority: Major
> Labels: portability
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Now that KafkaIO is supported via the external transform API (BEAM-7029) we
> should add support for PubSub.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)