This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8adff85 issue#3939 : Allow client authentication from pulsar-flink
package (#3949)
8adff85 is described below
commit 8adff85d8fece38428f28c1288c0ebb7dde3a744
Author: Shivji Kumar Jha <[email protected]>
AuthorDate: Tue Apr 2 21:32:01 2019 +0530
issue#3939 : Allow client authentication from pulsar-flink package (#3949)
Problem:
========
pulsar-flink module (aka flink connector) internally uses pulsar-client.
Though the pulsar client allows setting tokens in the client builder, the flink
connector does not provide a way to pass authentication token to the pulsar
client it uses internally.
Solution:
========
Accept authetication information as an input in pulsar-flink module. Pass
this authentication information to pulsar-client.
---
.../example/FlinkPulsarBatchAvroSinkExample.java | 3 +-
.../example/FlinkPulsarBatchCsvSinkExample.java | 3 +-
.../example/FlinkPulsarBatchJsonSinkExample.java | 3 +-
.../example/FlinkPulsarBatchSinkExample.java | 3 +-
.../example/PulsarConsumerSourceWordCount.java | 2 +
...lsarConsumerSourceWordCountToAvroTableSink.java | 3 +-
...lsarConsumerSourceWordCountToJsonTableSink.java | 3 +-
.../FlinkPulsarBatchAvroSinkScalaExample.scala | 3 +-
.../FlinkPulsarBatchCsvSinkScalaExample.scala | 3 +-
.../FlinkPulsarBatchJsonSinkScalaExample.scala | 3 +-
.../example/FlinkPulsarBatchSinkScalaExample.scala | 3 +-
.../connectors/pulsar/BasePulsarOutputFormat.java | 17 +++---
.../connectors/pulsar/PulsarAvroOutputFormat.java | 5 +-
.../connectors/pulsar/PulsarCsvOutputFormat.java | 5 +-
.../connectors/pulsar/PulsarJsonOutputFormat.java | 5 +-
.../connectors/pulsar/PulsarOutputFormat.java | 5 +-
.../connectors/pulsar/FlinkPulsarProducer.java | 14 ++++-
.../connectors/pulsar/PulsarAvroTableSink.java | 8 ++-
.../connectors/pulsar/PulsarConsumerSource.java | 4 ++
.../connectors/pulsar/PulsarJsonTableSink.java | 7 ++-
.../connectors/pulsar/PulsarSourceBuilder.java | 61 ++++++++++++++++++++++
.../connectors/pulsar/PulsarTableSink.java | 5 ++
.../pulsar/PulsarAvroOutputFormatTest.java | 11 ++--
.../pulsar/PulsarCsvOutputFormatTest.java | 11 ++--
.../pulsar/PulsarJsonOutputFormatTest.java | 11 ++--
.../connectors/pulsar/PulsarOutputFormatTest.java | 15 +++---
.../connectors/pulsar/PulsarAvroTableSinkTest.java | 6 ++-
.../connectors/pulsar/PulsarJsonTableSinkTest.java | 6 ++-
28 files changed, 174 insertions(+), 54 deletions(-)
diff --git
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 1349dba..6d077f9 100644
---
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import java.util.Arrays;
import java.util.List;
@@ -63,7 +64,7 @@ public class FlinkPulsarBatchAvroSinkExample {
System.out.println("\tTopic:\t" + topic);
// create PulsarAvroOutputFormat instance
- final OutputFormat<NasaMission> pulsarAvroOutputFormat = new
PulsarAvroOutputFormat<>(serviceUrl, topic);
+ final OutputFormat<NasaMission> pulsarAvroOutputFormat = new
PulsarAvroOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
index 3e658dc..4abb0a4 100644
---
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
+++
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import java.util.Arrays;
import java.util.List;
@@ -65,7 +66,7 @@ public class FlinkPulsarBatchCsvSinkExample {
// create PulsarCsvOutputFormat instance
final OutputFormat<Tuple4<Integer, String, Integer, Integer>>
pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat<>(serviceUrl, topic);
+ new PulsarCsvOutputFormat<>(serviceUrl, topic, new
AuthenticationDisabled());
// create DataSet
DataSet<Tuple4<Integer, String, Integer, Integer>> nasaMissionDS =
env.fromCollection(nasaMissions);
diff --git
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
index 3937ae9..dc56364 100644
---
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
+++
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import java.util.Arrays;
import java.util.List;
@@ -62,7 +63,7 @@ public class FlinkPulsarBatchJsonSinkExample {
System.out.println("\tTopic:\t" + topic);
// create PulsarJsonOutputFormat instance
- final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(serviceUrl, topic);
+ final OutputFormat<NasaMission> pulsarJsonOutputFormat = new
PulsarJsonOutputFormat<>(serviceUrl, topic, new AuthenticationDisabled());
// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
diff --git
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
index c90d016..2c89579 100644
---
a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
+++
b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
/**
* Implements a batch word-count program on Pulsar topic by writing Flink
DataSet.
@@ -60,7 +61,7 @@ public class FlinkPulsarBatchSinkExample {
// create PulsarOutputFormat instance
final OutputFormat pulsarOutputFormat =
- new PulsarOutputFormat(serviceUrl, topic, wordWithCount ->
wordWithCount.toString().getBytes());
+ new PulsarOutputFormat(serviceUrl, topic, new
AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());
// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
diff --git
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
index 942ddc1..a323f8f 100644
---
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
+++
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
@@ -35,6 +35,7 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
/**
* Implements a streaming wordcount program on pulsar topics.
@@ -97,6 +98,7 @@ public class PulsarConsumerSourceWordCount {
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
+ new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
diff --git
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
index 9fdc9a2..84e85e8 100644
---
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
/**
* Implements a streaming wordcount program on pulsar topics.
@@ -107,7 +108,7 @@ public class PulsarConsumerSourceWordCountToAvroTableSink {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
- sink = new PulsarAvroTableSink(serviceUrl, outputTopic,
ROUTING_KEY, WordWithCount.class);
+ sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new
AuthenticationDisabled(), ROUTING_KEY, WordWithCount.class);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
diff --git
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
index 1be9dde..a4f9c3c 100644
---
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
/**
* Implements a streaming wordcount program on pulsar topics.
@@ -108,7 +109,7 @@ public class PulsarConsumerSourceWordCountToJsonTableSink {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
- sink = new PulsarJsonTableSink(serviceUrl, outputTopic,
ROUTING_KEY);
+ sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new
AuthenticationDisabled(), ROUTING_KEY);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
diff --git
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index f10d6c1..a64e656 100644
---
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.avro.generated.NasaMission
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet
as Avro.
@@ -59,7 +60,7 @@ object FlinkPulsarBatchAvroSinkScalaExample {
// create PulsarCsvOutputFormat instance
val pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic)
+ new PulsarAvroOutputFormat[NasaMission](serviceUrl, topic, new
AuthenticationDisabled())
// create DataSet
val textDS = env.fromCollection(nasaMissions)
diff --git
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
index 3233616..302d0ab 100644
---
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
+++
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet
as Csv.
@@ -65,7 +66,7 @@ object FlinkPulsarBatchCsvSinkScalaExample {
// create PulsarCsvOutputFormat instance
val pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic)
+ new PulsarCsvOutputFormat[NasaMission](serviceUrl, topic, new
AuthenticationDisabled())
// create DataSet
val textDS = env.fromCollection(nasaMissions)
diff --git
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
index 60d02e5..9518751 100644
---
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
+++
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat
import scala.beans.BeanProperty
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet
as Json.
@@ -66,7 +67,7 @@ object FlinkPulsarBatchJsonSinkScalaExample {
println("\tTopic:\t" + topic)
// create PulsarJsonOutputFormat instance
- val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](serviceUrl, topic)
+ val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](serviceUrl, topic, new
AuthenticationDisabled())
// create DataSet
val nasaMissionDS = env.fromCollection(nasaMissions)
diff --git
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
index 4de0dcb..369e56d 100644
---
a/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
+++
b/examples/flink/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
import org.apache.flink.util.Collector
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled
/**
* Data type for words with count.
@@ -63,7 +64,7 @@ object FlinkPulsarBatchSinkScalaExample {
// create PulsarOutputFormat instance
val pulsarOutputFormat =
- new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new
SerializationSchema[WordWithCount] {
+ new PulsarOutputFormat[WordWithCount](serviceUrl, topic, new
AuthenticationDisabled(), new SerializationSchema[WordWithCount] {
override def serialize(wordWithCount: WordWithCount): Array[Byte] =
wordWithCount.toString.getBytes
})
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index ca34327..644c8e9 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,14 +47,16 @@ public abstract class BasePulsarOutputFormat<T> extends
RichOutputFormat<T> {
protected final String serviceUrl;
protected final String topicName;
+ private final Authentication authentication;
protected SerializationSchema<T> serializationSchema;
- protected BasePulsarOutputFormat(final String serviceUrl, final String
topicName) {
+ protected BasePulsarOutputFormat(final String serviceUrl, final String
topicName, final Authentication authentication) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl),
"serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName),
"topicName cannot be blank.");
this.serviceUrl = serviceUrl;
this.topicName = topicName;
+ this.authentication = authentication;
LOG.info("PulsarOutputFormat is being started to write batches to
Pulsar topic: {}", this.topicName);
}
@@ -65,7 +68,7 @@ public abstract class BasePulsarOutputFormat<T> extends
RichOutputFormat<T> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- this.producer = getProducerInstance(serviceUrl, topicName);
+ this.producer = getProducerInstance(serviceUrl, topicName,
authentication);
this.failureCallback = cause -> {
LOG.error("Error while sending record to Pulsar: " +
cause.getMessage(), cause);
@@ -85,11 +88,12 @@ public abstract class BasePulsarOutputFormat<T> extends
RichOutputFormat<T> {
}
- private static Producer<byte[]> getProducerInstance(String serviceUrl,
String topicName) throws PulsarClientException {
+ private static Producer<byte[]> getProducerInstance(String serviceUrl,
String topicName, Authentication authentication)
+ throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
- producer =
Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
+ producer =
Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName,
authentication),
"Pulsar producer cannot be null.");
}
}
@@ -97,9 +101,10 @@ public abstract class BasePulsarOutputFormat<T> extends
RichOutputFormat<T> {
return producer;
}
- private static Producer<byte[]> createPulsarProducer(String serviceUrl,
String topicName) throws PulsarClientException {
+ private static Producer<byte[]> createPulsarProducer(String serviceUrl,
String topicName, Authentication authentication)
+ throws PulsarClientException {
try {
- PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
return client.newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
LOG.error("Pulsar producer cannot be created.", e);
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
index d15dfe7..52484ef 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
import org.apache.avro.specific.SpecificRecord;
import
org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in
Avro format.
@@ -28,8 +29,8 @@ public class PulsarAvroOutputFormat<T extends SpecificRecord>
extends BasePulsar
private static final long serialVersionUID = -6794070714728773530L;
- public PulsarAvroOutputFormat(String serviceUrl, String topicName) {
- super(serviceUrl, topicName);
+ public PulsarAvroOutputFormat(String serviceUrl, String topicName,
Authentication authentication) {
+ super(serviceUrl, topicName, authentication);
this.serializationSchema = new AvroSerializationSchema();
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index adae9f7..d36a260 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
import org.apache.flink.api.java.tuple.Tuple;
import
org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv
format.
@@ -28,8 +29,8 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends
BasePulsarOutputForm
private static final long serialVersionUID = -4461671510903404196L;
- public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
- super(serviceUrl, topicName);
+ public PulsarCsvOutputFormat(String serviceUrl, String topicName,
Authentication authentication) {
+ super(serviceUrl, topicName, authentication);
this.serializationSchema = new CsvSerializationSchema<>();
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index 3fe5baa..96d7a01 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar;
import
org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in
Json format.
@@ -27,8 +28,8 @@ public class PulsarJsonOutputFormat<T> extends
BasePulsarOutputFormat<T> {
private static final long serialVersionUID = 8499620770848461958L;
- public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
- super(serviceUrl, topicName);
+ public PulsarJsonOutputFormat(String serviceUrl, String topicName,
Authentication authentication) {
+ super(serviceUrl, topicName, authentication);
this.serializationSchema = new JsonSerializationSchema();
}
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 889970f..393faaf 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Output Format to write Flink DataSets into a Pulsar topic in
user-defined format.
@@ -28,8 +29,8 @@ public class PulsarOutputFormat<T> extends
BasePulsarOutputFormat<T> {
private static final long serialVersionUID = 2997027580167793000L;
- public PulsarOutputFormat(String serviceUrl, String topicName, final
SerializationSchema<T> serializationSchema) {
- super(serviceUrl, topicName);
+ public PulsarOutputFormat(String serviceUrl, String topicName,
Authentication authentication, final SerializationSchema<T>
serializationSchema) {
+ super(serviceUrl, topicName, authentication);
Preconditions.checkNotNull(serializationSchema, "serializationSchema
cannot be null.");
this.serializationSchema = serializationSchema;
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index c0d3905..55eb619 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +66,12 @@ public class FlinkPulsarProducer<IN>
protected final String defaultTopicName;
/**
+ * Pulsar client will use this authentication information, if required.
+ */
+ private final Authentication authentication;
+
+
+ /**
* (Serializable) SerializationSchema for turning objects used with Flink
into.
* byte[] for Pulsar.
*/
@@ -121,13 +128,15 @@ public class FlinkPulsarProducer<IN>
public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
+ Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
- this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor,
null);
+ this(serviceUrl, defaultTopicName, authentication,
serializationSchema, keyExtractor, null);
}
public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
+ Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
Map<String, Object> producerConfig) {
@@ -135,6 +144,7 @@ public class FlinkPulsarProducer<IN>
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName
cannot be blank");
this.serviceUrl = serviceUrl;
this.defaultTopicName = defaultTopicName;
+ this.authentication = authentication;
this.schema = checkNotNull(serializationSchema, "Serialization Schema
not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
@@ -190,7 +200,7 @@ public class FlinkPulsarProducer<IN>
}
private Producer<byte[]> createProducer() throws Exception {
- PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (producerConfig != null) {
producerBuilder = producerBuilder.loadConf(producerConfig);
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index b370345..20999fd 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -36,6 +36,7 @@ import
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
/**
* An append-only table sink to emit a streaming table as a Pulsar stream that
serializes data in Avro format.
@@ -44,6 +45,7 @@ public class PulsarAvroTableSink implements
AppendStreamTableSink<Row> {
protected final String serviceUrl;
protected final String topic;
+ protected final Authentication authentication;
protected final String routingKeyFieldName;
protected SerializationSchema<Row> serializationSchema;
protected String[] fieldNames;
@@ -56,16 +58,17 @@ public class PulsarAvroTableSink implements
AppendStreamTableSink<Row> {
*
* @param serviceUrl pulsar service url
* @param topic topic in pulsar to which table is written
- * @param producerConf producer configuration
* @param routingKeyFieldName routing key field name
*/
public PulsarAvroTableSink(
String serviceUrl,
String topic,
+ Authentication authentication,
String routingKeyFieldName,
Class<? extends SpecificRecord> recordClazz) {
this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
this.topic = checkNotNull(topic, "Topic is null");
+ this.authentication = checkNotNull(authentication, "authentication is
null, set new AuthenticationDisabled() instead");
this.routingKeyFieldName = routingKeyFieldName;
this.recordClazz = recordClazz;
}
@@ -78,6 +81,7 @@ public class PulsarAvroTableSink implements
AppendStreamTableSink<Row> {
return new FlinkPulsarProducer<Row>(
serviceUrl,
topic,
+ authentication,
serializationSchema,
keyExtractor);
}
@@ -110,7 +114,7 @@ public class PulsarAvroTableSink implements
AppendStreamTableSink<Row> {
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[]
fieldTypes) {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic,
routingKeyFieldName, recordClazz);
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic,
authentication, routingKeyFieldName, recordClazz);
sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 6479bf0..5af82bc 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +59,7 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
private final int messageReceiveTimeoutMs = 100;
private final String serviceUrl;
private final Set<String> topicNames;
+ private final Authentication authentication;
private final Pattern topicsPattern;
private final String subscriptionName;
private final DeserializationSchema<T> deserializer;
@@ -75,6 +77,7 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
super(MessageId.class);
this.serviceUrl = builder.serviceUrl;
+ this.authentication = builder.authentication;
this.topicNames = builder.topicNames;
this.topicsPattern = builder.topicsPattern;
this.deserializer = builder.deserializationSchema;
@@ -191,6 +194,7 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
PulsarClient createClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(serviceUrl)
+ .authentication(authentication)
.build();
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
index 45c2642..c37250d 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -22,6 +22,7 @@ import
org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
/**
* Base class for {@link PulsarTableSink} that serializes data in JSON format.
@@ -33,14 +34,15 @@ public class PulsarJsonTableSink extends PulsarTableSink {
*
* @param serviceUrl pulsar service url
* @param topic topic in pulsar to which table is written
- * @param producerConf producer configuration
+ * @param authentication authetication info required by pulsar client
* @param routingKeyFieldName routing key field name
*/
public PulsarJsonTableSink(
String serviceUrl,
String topic,
+ Authentication authentication,
String routingKeyFieldName) {
- super(serviceUrl, topic, routingKeyFieldName);
+ super(serviceUrl, topic, authentication, routingKeyFieldName);
}
@Override
@@ -53,6 +55,7 @@ public class PulsarJsonTableSink extends PulsarTableSink {
return new PulsarJsonTableSink(
serviceUrl,
topic,
+ authentication,
routingKeyFieldName);
}
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3f30390..3b78495 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -23,12 +23,16 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
+import java.util.Map;
/**
* A class for building a pulsar source.
@@ -43,6 +47,7 @@ public class PulsarSourceBuilder<T> {
final DeserializationSchema<T> deserializationSchema;
String serviceUrl = SERVICE_URL;
final Set<String> topicNames = new TreeSet<>();
+ Authentication authentication;
Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
@@ -163,6 +168,62 @@ public class PulsarSourceBuilder<T> {
throw new IllegalArgumentException("acknowledgementBatchSize can only
take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
}
+ /**
+ * Set the authentication provider to use in the Pulsar client instance.
+ *
+ * @param authentication an instance of the {@link Authentication}
provider already constructed
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> authentication(Authentication
authentication) {
+ Preconditions.checkArgument(authentication != null,
+ "authentication instance can not be null, use new
AuthenticationDisabled() to disable authentication");
+ this.authentication = authentication;
+ return this;
+ }
+
+ /**
+ * Configure the authentication provider to use in the Pulsar client
instance
+ *
+ * @param authPluginClassName
+ * name of the Authentication-Plugin to use
+ * @param authParamsString
+ * string which represents parameters for the
Authentication-Plugin, e.g., "key1:val1,key2:val2"
+ * @return this builder
+ * @throws PulsarClientException.UnsupportedAuthenticationException
+ * failed to instantiate specified Authentication-Plugin
+ */
+ public PulsarSourceBuilder<T> authentication(String authPluginClassName,
String authParamsString)
+ throws PulsarClientException.UnsupportedAuthenticationException {
+
Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+ "Authentication-Plugin class name can not be blank");
+ Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
+ "Authentication-Plugin parameters can not be blank");
+ this.authentication =
AuthenticationFactory.create(authPluginClassName, authParamsString);
+ return this;
+ }
+
+ /**
+ * Configure the authentication provider to use in the Pulsar client
instance
+ * using a config map.
+ *
+ * @param authPluginClassName
+ * name of the Authentication-Plugin you want to use
+ * @param authParams
+ * map which represents parameters for the Authentication-Plugin
+ * @return this builder
+ * @throws PulsarClientException.UnsupportedAuthenticationException
+ * failed to instantiate specified Authentication-Plugin
+ */
+ public PulsarSourceBuilder<T> authentication(String authPluginClassName,
Map<String, String> authParams)
+ throws PulsarClientException.UnsupportedAuthenticationException {
+
Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+ "Authentication-Plugin class name can not be blank");
+ Preconditions.checkArgument((authParams != null &&
authParams.isEmpty() == false),
+ "parameters to authentication plugin can not be null/empty");
+ this.authentication =
AuthenticationFactory.create(authPluginClassName, authParams);
+ return this;
+ }
+
public SourceFunction<T> build() {
Preconditions.checkNotNull(serviceUrl, "a service url is required");
Preconditions.checkArgument((topicNames != null &&
!topicNames.isEmpty()) || topicsPattern != null,
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index 0fc45f7..5d20a1d 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -33,6 +33,7 @@ import
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
/**
* An append-only table sink to emit a streaming table as a Pulsar stream.
@@ -41,6 +42,7 @@ public abstract class PulsarTableSink implements
AppendStreamTableSink<Row> {
protected final String serviceUrl;
protected final String topic;
+ protected Authentication authentication;
protected SerializationSchema<Row> serializationSchema;
protected PulsarKeyExtractor<Row> keyExtractor;
protected String[] fieldNames;
@@ -50,9 +52,11 @@ public abstract class PulsarTableSink implements
AppendStreamTableSink<Row> {
public PulsarTableSink(
String serviceUrl,
String topic,
+ Authentication authentication,
String routingKeyFieldName) {
this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
this.topic = checkNotNull(topic, "Topic is null");
+ this.authentication = checkNotNull(authentication, "authentication is
null, set new AuthenticationDisabled() instead");
this.routingKeyFieldName = routingKeyFieldName;
}
@@ -78,6 +82,7 @@ public abstract class PulsarTableSink implements
AppendStreamTableSink<Row> {
return new FlinkPulsarProducer<Row>(
serviceUrl,
topic,
+ authentication,
serializationSchema,
keyExtractor);
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
index 62c3b5d..bedcbda 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -30,28 +31,28 @@ public class PulsarAvroOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarAvroOutputFormat(null, "testTopic");
+ new PulsarAvroOutputFormat(null, "testTopic", new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarAvroOutputFormat("testServiceUrl", null);
+ new PulsarAvroOutputFormat("testServiceUrl", null, new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarAvroOutputFormat("testServiceUrl", " ");
+ new PulsarAvroOutputFormat("testServiceUrl", " ", new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarAvroOutputFormat(" ", "testTopic");
+ new PulsarAvroOutputFormat(" ", "testTopic", new
AuthenticationDisabled());
}
@Test
public void testPulsarAvroOutputFormatConstructor() {
PulsarAvroOutputFormat pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat("testServiceUrl", "testTopic");
+ new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new
AuthenticationDisabled());
assertNotNull(pulsarAvroOutputFormat);
}
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index a564a89..caccb6b 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@ public class PulsarCsvOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarCsvOutputFormat(null, "testTopic");
+ new PulsarCsvOutputFormat(null, "testTopic", new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarCsvOutputFormat("testServiceUrl", null);
+ new PulsarCsvOutputFormat("testServiceUrl", null, new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarCsvOutputFormat("testServiceUrl", " ");
+ new PulsarCsvOutputFormat("testServiceUrl", " ", new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarCsvOutputFormat(" ", "testTopic");
+ new PulsarCsvOutputFormat(" ", "testTopic", new
AuthenticationDisabled());
}
@Test
public void testPulsarCsvOutputFormatConstructor() {
PulsarCsvOutputFormat pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat("testServiceUrl", "testTopic");
+ new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new
AuthenticationDisabled());
assertNotNull(pulsarCsvOutputFormat);
}
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
index b9953cf..4ab7232 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@ public class PulsarJsonOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarJsonOutputFormat(null, "testTopic");
+ new PulsarJsonOutputFormat(null, "testTopic", new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarJsonOutputFormat("testServiceUrl", null);
+ new PulsarJsonOutputFormat("testServiceUrl", null, new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarJsonOutputFormat("testServiceUrl", " ");
+ new PulsarJsonOutputFormat("testServiceUrl", " ", new
AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarJsonOutputFormat(" ", "testTopic");
+ new PulsarJsonOutputFormat(" ", "testTopic", new
AuthenticationDisabled());
}
@Test
public void testPulsarJsonOutputFormatConstructor() {
PulsarJsonOutputFormat pulsarJsonOutputFormat =
- new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+ new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new
AuthenticationDisabled());
assertNotNull(pulsarJsonOutputFormat);
}
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 41cf8b2..238c49b 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.pulsar;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -35,34 +36,34 @@ public class PulsarOutputFormatTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarOutputFormat(null, "testTopic", text ->
text.toString().getBytes());
+ new PulsarOutputFormat(null, "testTopic", new
AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarOutputFormat("testServiceUrl", null, text ->
text.toString().getBytes());
+ new PulsarOutputFormat("testServiceUrl", null, new
AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarOutputFormat("testServiceUrl", " ", text ->
text.toString().getBytes());
+ new PulsarOutputFormat("testServiceUrl", " ", new
AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarOutputFormat(" ", "testTopic", text ->
text.toString().getBytes());
+ new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(),
text -> text.toString().getBytes());
}
@Test(expectedExceptions = NullPointerException.class)
public void
testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
- new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+ new PulsarOutputFormat("testServiceUrl", "testTopic", new
AuthenticationDisabled(), null);
}
@Test
public void testPulsarOutputFormatWithStringSerializationSchema() throws
IOException {
String input = "Wolfgang Amadeus Mozart";
PulsarOutputFormat pulsarOutputFormat =
- new PulsarOutputFormat("testServiceUrl", "testTopic",
+ new PulsarOutputFormat("testServiceUrl", "testTopic", new
AuthenticationDisabled(),
text -> text.toString().getBytes());
assertNotNull(pulsarOutputFormat);
byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
@@ -74,7 +75,7 @@ public class PulsarOutputFormatTest {
public void testPulsarOutputFormatWithCustomSerializationSchema() throws
IOException {
Employee employee = new Employee(1, "Test Employee", "Test
Department");
PulsarOutputFormat pulsarOutputFormat =
- new PulsarOutputFormat("testServiceUrl", "testTopic",
+ new PulsarOutputFormat("testServiceUrl", "testTopic", new
AuthenticationDisabled(),
new EmployeeSerializationSchema());
assertNotNull(pulsarOutputFormat);
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 85bb2ed..125ee4a 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@ import static
org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
public class PulsarAvroTableSinkTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test_topic";
+ private static final Authentication AUTHENTICATION = new
AuthenticationDisabled();
private static final String ROUTING_KEY = "name";
private final String[] fieldNames = {"id", "name","start_year","end_year"};
@@ -86,13 +89,14 @@ public class PulsarAvroTableSinkTest {
private PulsarAvroTableSink spySink() throws Exception {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL,
TOPIC_NAME, ROUTING_KEY, NasaMission.class);
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL,
TOPIC_NAME, AUTHENTICATION, ROUTING_KEY, NasaMission.class);
FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
PowerMockito.whenNew(
FlinkPulsarProducer.class
).withArguments(
Mockito.anyString(),
Mockito.anyString(),
+ Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 9ceefff..c42ae6c 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@ public class PulsarJsonTableSinkTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test_topic";
+ private static final Authentication AUTHENTICATION = new
AuthenticationDisabled();
private static final String ROUTING_KEY = "key";
private final String[] fieldNames = {"key", "value"};
private final TypeInformation[] typeInformations = {
@@ -80,13 +83,14 @@ public class PulsarJsonTableSinkTest {
}
private PulsarJsonTableSink spySink() throws Exception {
- PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL,
TOPIC_NAME, ROUTING_KEY);
+ PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL,
TOPIC_NAME, AUTHENTICATION, ROUTING_KEY);
FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
PowerMockito.whenNew(
FlinkPulsarProducer.class
).withArguments(
Mockito.anyString(),
Mockito.anyString(),
+ Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);