This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4956859 [FLINK-26931][Connector/pulsar] Make the producer name and
consumer name unique for each instance.
4956859 is described below
commit 495685970e31085815ba0435322ab44e4504cd55
Author: Yufan Sheng <[email protected]>
AuthorDate: Wed Mar 30 20:19:51 2022 +0800
[FLINK-26931][Connector/pulsar] Make the producer name and consumer name
unique for each instance.
---
.../pulsar/common/config/PulsarClientFactory.java | 18 ++++++++++--------
.../pulsar/common/config/PulsarConfiguration.java | 2 +-
.../flink/connector/pulsar/sink/PulsarSinkBuilder.java | 5 +++++
.../pulsar/sink/config/PulsarSinkConfigUtils.java | 6 +++++-
.../connector/pulsar/sink/writer/PulsarWriter.java | 2 +-
.../connector/pulsar/source/PulsarSourceBuilder.java | 7 ++++++-
.../pulsar/source/config/PulsarSourceConfigUtils.java | 6 +++++-
.../flink/connector/pulsar/sink/PulsarSinkITCase.java | 2 +-
8 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
index b1214b5..1f01b24 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
@@ -153,7 +153,7 @@ public final class PulsarClientFactory {
/**
* PulsarAdmin shares almost the same configuration with PulsarClient, but
we separate this
- * create method for directly creating it.
+ * creating method for directly use it.
*/
public static PulsarAdmin createAdmin(PulsarConfiguration configuration) {
PulsarAdminBuilder builder = PulsarAdmin.builder();
@@ -200,15 +200,17 @@ public final class PulsarClientFactory {
String authParamsString =
configuration.get(PULSAR_AUTH_PARAMS);
return sneakyClient(
() ->
AuthenticationFactory.create(authPluginClassName, authParamsString));
- } else if (configuration.contains(PULSAR_AUTH_PARAM_MAP)) {
- Map<String, String> paramsMap =
configuration.get(PULSAR_AUTH_PARAM_MAP);
+ } else {
+ Map<String, String> paramsMap =
configuration.getProperties(PULSAR_AUTH_PARAM_MAP);
+ if (paramsMap.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "No %s or %s provided",
+ PULSAR_AUTH_PARAMS.key(),
PULSAR_AUTH_PARAM_MAP.key()));
+ }
+
return sneakyClient(
() ->
AuthenticationFactory.create(authPluginClassName, paramsMap));
- } else {
- throw new IllegalArgumentException(
- String.format(
- "No %s or %s provided",
- PULSAR_AUTH_PARAMS.key(),
PULSAR_AUTH_PARAM_MAP.key()));
}
}
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
index 3e64c66..0681a3e 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
@@ -76,7 +76,7 @@ public abstract class PulsarConfiguration extends
UnmodifiableConfiguration {
return properties;
}
- /** Get an option value from the given config, convert it into the a new
value instance. */
+ /** Get an option value from the given config, convert it into a new value
instance. */
public <F, T> T get(ConfigOption<F> option, Function<F, T> convertor) {
F value = get(option);
if (value != null) {
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index 1668e3d..332a27a 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -315,6 +315,11 @@ public class PulsarSinkBuilder<IN> {
if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
LOG.warn(
"We recommend set a readable producer name through
setProducerName(String) in production mode.");
+ } else {
+ String producerName = configBuilder.get(PULSAR_PRODUCER_NAME);
+ if (!producerName.contains("%s")) {
+ configBuilder.override(PULSAR_PRODUCER_NAME, producerName + "
- %s");
+ }
}
checkNotNull(serializationSchema, "serializationSchema must be set.");
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 13821fe..50f0e15 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import java.util.Map;
+import java.util.UUID;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -70,7 +71,10 @@ public final class PulsarSinkConfigUtils {
PulsarClient client, Schema<T> schema, SinkConfiguration
configuration) {
ProducerBuilder<T> builder = client.newProducer(schema);
- configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName);
+ configuration.useOption(
+ PULSAR_PRODUCER_NAME,
+ producerName -> String.format(producerName, UUID.randomUUID()),
+ builder::producerName);
configuration.useOption(
PULSAR_SEND_TIMEOUT_MS,
Math::toIntExact,
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 1e4113a..dec4f52 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -189,7 +189,7 @@ public class PulsarWriter<IN> implements
PrecommittingSinkWriter<IN, PulsarCommi
this.pendingMessages -= 1;
}
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({"rawtypes", "unchecked"})
private TypedMessageBuilder<?> createMessageBuilder(
String topic, Context context, PulsarMessage<?> message) {
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 517286d..ed68215 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -473,9 +473,14 @@ public final class PulsarSourceBuilder<OUT> {
if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
LOG.warn(
"We recommend set a readable consumer name through
setConsumerName(String) in production mode.");
+ } else {
+ String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
+ if (!consumerName.contains("%s")) {
+ configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + "
- %s");
+ }
}
- // Since these implementation could be a lambda, make sure they are
serializable.
+ // Since these implementations could be a lambda, make sure they are
serializable.
checkState(isSerializable(startCursor), "StartCursor isn't
serializable");
checkState(isSerializable(stopCursor), "StopCursor isn't
serializable");
checkState(isSerializable(rangeGenerator), "RangeGenerator isn't
serializable");
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index adb8a03..602a157 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Schema;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -106,7 +107,10 @@ public final class PulsarSourceConfigUtils {
configuration.useOption(
PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS,
builder::maxTotalReceiverQueueSizeAcrossPartitions);
- configuration.useOption(PULSAR_CONSUMER_NAME, builder::consumerName);
+ configuration.useOption(
+ PULSAR_CONSUMER_NAME,
+ consumerName -> String.format(consumerName, UUID.randomUUID()),
+ builder::consumerName);
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
configuration.useOption(
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 2c2c05d..e5da329 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -61,7 +61,7 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(DeliveryGuarantee.class)
void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
- // A random topic with partition 1.
+ // A random topic with partition 4.
String topic = randomAlphabetic(8);
operator().createTopic(topic, 4);
int counts = ThreadLocalRandom.current().nextInt(100, 200);