[
https://issues.apache.org/jira/browse/BEAM-8218?focusedWorklogId=665371&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-665371
]
ASF GitHub Bot logged work on BEAM-8218:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Oct/21 21:13
Start Date: 13/Oct/21 21:13
Worklog Time Spent: 10m
Work Description: MarcoRob commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r728451852
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
[email protected]
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+ private PulsarClient client;
+ private String topic;
+ private List<String> topics;
+ private PulsarAdmin admin;
+
+ private String clientUrl;
+ private String adminUrl;
+
+
+ public void setServiceUrl(String clientUrl, String adminUrl) throws
PulsarClientException {
+ this.clientUrl = clientUrl;
+ this.adminUrl = adminUrl;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
+
+ private void initPulsarClients() throws PulsarClientException {
+ if(this.adminUrl == null && this.clientUrl == null) {
+ this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+ this.clientUrl = PulsarIOUtils.SERVICE_URL;
+ }
+ this.client = PulsarClient.builder()
+ .serviceUrl(clientUrl)
+ .build();
+
+ //TODO fix auth for admin connection
+ boolean tlsAllowInsecureConnection = false;
+ String tlsTrustCertsFilePath = null;
+ this.admin = PulsarAdmin.builder()
+ // .authentication(authPluginClassName,authParams)
+ .serviceHttpUrl(adminUrl)
+ .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+ .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+ .build();
+ }
+
+ private void closePulsarClients() throws PulsarClientException {
+ this.admin.close();
+ this.client.close();
+ }
+
+ // Open connection to Pulsar clients
+ @Setup
+ public void setup() throws Exception {
+ this.initPulsarClients();
+ }
+ // Close connection to Pulsar clients
+ @Teardown
+ public void teardown() throws Exception {
+ this.closePulsarClients();
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element PulsarSource
pulsarSource) {
+ // Reading a topic from starting point with offset 0
+ long startOffset = 0;
+ if(pulsarSource.getStartOffset() != null) {
+ startOffset = pulsarSource.getStartOffset();
+ }
+
+ return new OffsetRange(startOffset, Long.MAX_VALUE);
+ }
+
+ /*
+ It may define a DoFn.GetSize method or ensure that the RestrictionTracker
implements
+ RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or
splitting may result
+ if size or progress is an inaccurate representation of work.
+ See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+ */
+ @GetSize
+ public double getSize(@Element PulsarSource pulsarSource, @Restriction
OffsetRange offsetRange) {
+ //TODO improve getsize estiamate, check pulsar stats to improve get
size estimate
+ // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+ double estimateRecords = restrictionTracker(pulsarSource,
offsetRange).getProgress().getWorkRemaining();
+ return estimateRecords;
+ }
+
+ private Reader<byte[]> newReader(PulsarClient client, MessageId
startMessageId) throws PulsarClientException {
+ ReaderBuilder<byte[]> builder =
client.newReader().topic(topic).startMessageId(startMessageId);
+ return builder.create();
+ }
+
+ @ProcessElement
+ public ProcessContinuation processElement(
+ @Element PulsarRecord pulsarRecord,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<PulsarRecord> output) throws IOException {
+
+ long startOffset = tracker.currentRestriction().getFrom();
+ //long expectedOffset = startOffset;
+ MessageId startMessageId = (startOffset != 0) ?
+ MessageIdUtils.getMessageId(startOffset) :
MessageId.earliest;
+
+ //TODO: if topic is partitioned need to create n readers for n
topic-partitions
+ try(Reader<byte[]> reader = newReader(client, startMessageId)) {
Review comment:
You can increase the number of partitions but not decrease it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 665371)
Time Spent: 2h 10m (was: 2h)
> Implement Apache PulsarIO
> -------------------------
>
> Key: BEAM-8218
> URL: https://issues.apache.org/jira/browse/BEAM-8218
> Project: Beam
> Issue Type: Task
> Components: io-ideas
> Reporter: Alex Van Boxel
> Assignee: Marco Robles
> Priority: P3
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Apache Pulsar is starting to gain popularity. Having a native Beam PulsarIO
> could be beneficial.
> [https://pulsar.apache.org/|https://pulsar.apache.org/en/]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)