ambition119 commented on a change in pull request #4143: [pulsar-spark] upgrade
pulsar-client and add spark example
URL: https://github.com/apache/pulsar/pull/4143#discussion_r279014186
##########
File path:
pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
##########
@@ -19,76 +19,89 @@
package org.apache.pulsar.spark;
import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
- private ClientConfiguration clientConfiguration;
- private ConsumerConfiguration consumerConfiguration;
+ private String serviceUrl;
+ private ConsumerConfigurationData<byte[]> conf;
+ private Authentication authentication;
private PulsarClient pulsarClient;
- private String url;
- private String topic;
- private String subscription;
+ private Consumer<byte[]> consumer;
- public SparkStreamingPulsarReceiver(ClientConfiguration
clientConfiguration,
- ConsumerConfiguration consumerConfiguration, String url, String
topic, String subscription) {
- this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration,
consumerConfiguration, url, topic, subscription);
+ public SparkStreamingPulsarReceiver(
+ String serviceUrl,
+ ConsumerConfigurationData<byte[]> conf,
+ Authentication authentication) {
+ this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf,
authentication);
}
- public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
ClientConfiguration clientConfiguration,
- ConsumerConfiguration consumerConfiguration, String url, String
topic, String subscription) {
+ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
+ String serviceUrl,
+ ConsumerConfigurationData<byte[]> conf,
+ Authentication authentication) {
super(storageLevel);
- checkNotNull(clientConfiguration, "ClientConfiguration must not be
null");
- checkNotNull(consumerConfiguration, "ConsumerConfiguration must not be
null");
- this.clientConfiguration = clientConfiguration;
- this.url = url;
- this.topic = topic;
- this.subscription = subscription;
- if (consumerConfiguration.getAckTimeoutMillis() == 0) {
- consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS);
+
+ checkNotNull(serviceUrl, "serviceUrl must not be null");
+ checkNotNull(conf, "ConsumerConfigurationData must not be null");
+ checkArgument(conf.getTopicNames().size() > 0, "batchTimeMs must be a
positive long.");
Review comment:
should fix.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services