This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 787f5e4  [hotfix] Refactor callers that use deprecated get/setXXX of 
Configuration
787f5e4 is described below

commit 787f5e4e5bfdc5f7f17929d5f3d508a004394599
Author: Rui Fan <[email protected]>
AuthorDate: Sat Sep 14 16:11:30 2024 +0800

    [hotfix] Refactor callers that use deprecated get/setXXX of Configuration
---
 .../flink/connector/pulsar/sink/config/SinkConfiguration.java       | 6 +++---
 .../flink/connector/pulsar/source/config/SourceConfiguration.java   | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index e5d6dc5..3bc7a42 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -61,9 +61,9 @@ public class SinkConfiguration extends PulsarConfiguration {
         super(configuration);
 
         this.deliveryGuarantee = get(PULSAR_WRITE_DELIVERY_GUARANTEE);
-        this.transactionTimeoutMillis = 
getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT);
-        this.topicMetadataRefreshInterval = 
getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL);
-        this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
+        this.transactionTimeoutMillis = get(PULSAR_WRITE_TRANSACTION_TIMEOUT);
+        this.topicMetadataRefreshInterval = 
get(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL);
+        this.partitionSwitchSize = get(PULSAR_BATCHING_MAX_MESSAGES);
         this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
         this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
         this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index eb76f8c..2b47297 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -72,7 +72,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     public SourceConfiguration(Configuration configuration) {
         super(configuration);
 
-        this.messageQueueCapacity = getInteger(ELEMENT_QUEUE_CAPACITY);
+        this.messageQueueCapacity = get(ELEMENT_QUEUE_CAPACITY);
         this.partitionDiscoveryIntervalMs = 
get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
         this.enableAutoAcknowledgeMessage = 
get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
         this.autoCommitCursorInterval = 
get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);

Reply via email to