This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9bc8b0f37bec419bcdc4b8cdee3abf5320df5399 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Wed Feb 9 14:56:54 2022 +0800 [FLINK-26023][connector/pulsar] Create a Pulsar sink config model for matching ProducerConfigurationData. --- .../connector/pulsar/sink/PulsarSinkOptions.java | 259 +++++++++++++++++++++ .../pulsar/sink/config/PulsarSinkConfigUtils.java | 112 +++++++++ .../pulsar/sink/config/SinkConfiguration.java | 147 ++++++++++++ 3 files changed, 518 insertions(+) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java new file mode 100644 index 0000000..0e16830 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.ConfigGroup; +import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; + +import org.apache.pulsar.client.api.CompressionType; + +import java.time.Duration; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.configuration.description.LinkElement.link; +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; +import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; + +/** + * Configurations for PulsarSink. All the options list here could be configured in {@code + * PulsarSinkBuilder#setConfig(ConfigOption, Object)}. The {@link PulsarOptions} is also required + * for pulsar source. + * + * @see PulsarOptions for shared configure options. + */ +@PublicEvolving +@ConfigGroups( + groups = { + @ConfigGroup(name = "PulsarSink", keyPrefix = SINK_CONFIG_PREFIX), + @ConfigGroup(name = "PulsarProducer", keyPrefix = PRODUCER_CONFIG_PREFIX) + }) +public final class PulsarSinkOptions { + + // Pulsar sink connector config prefix. + public static final String SINK_CONFIG_PREFIX = "pulsar.sink."; + // Pulsar producer API config prefix. + public static final String PRODUCER_CONFIG_PREFIX = "pulsar.producer."; + + private PulsarSinkOptions() { + // This is a constant class + } + + /////////////////////////////////////////////////////////////////////////////// + // + // The configuration for pulsar sink part. + // All the configuration listed below should have the pulsar.sink prefix. + // + /////////////////////////////////////////////////////////////////////////////// + + public static final ConfigOption<DeliveryGuarantee> PULSAR_WRITE_DELIVERY_GUARANTEE = + ConfigOptions.key(SINK_CONFIG_PREFIX + "deliveryGuarantee") + .enumType(DeliveryGuarantee.class) + .defaultValue(DeliveryGuarantee.NONE) + .withDescription("Optional delivery guarantee when committing."); + + public static final ConfigOption<Long> PULSAR_WRITE_TRANSACTION_TIMEOUT = + ConfigOptions.key(SINK_CONFIG_PREFIX + "transactionTimeoutMillis") + .longType() + .defaultValue(Duration.ofHours(3).toMillis()) + .withDescription( + Description.builder() + .text( + "This option is used when the user require the %s semantic.", + code("DeliveryGuarantee.EXACTLY_ONCE")) + .text( + "We would use transaction for making sure the message could be write only once.") + .build()); + + public static final ConfigOption<Long> PULSAR_TOPIC_METADATA_REFRESH_INTERVAL = + ConfigOptions.key(SINK_CONFIG_PREFIX + "topicMetadataRefreshInterval") + .longType() + .defaultValue(Duration.ofMinutes(30).toMillis()) + .withDescription( + "Auto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes."); + + public static final ConfigOption<Boolean> PULSAR_WRITE_SCHEMA_EVOLUTION = + ConfigOptions.key(SINK_CONFIG_PREFIX + "enableSchemaEvolution") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "If you enable this option, we would consume and deserialize the message by using Pulsar's %s.", + code("Schema")) + .build()); + + public static final ConfigOption<Integer> PULSAR_MAX_RECOMMIT_TIMES = + ConfigOptions.key(SINK_CONFIG_PREFIX + "maxRecommitTimes") + .intType() + .defaultValue(5) + .withDescription( + "The allowed transaction recommit times if we meet some retryable exception." + + " This is used in Pulsar Transaction."); + + /////////////////////////////////////////////////////////////////////////////// + // + // The configuration for ProducerConfigurationData part. + // All the configuration listed below should have the pulsar.producer prefix. + // + /////////////////////////////////////////////////////////////////////////////// + + public static final ConfigOption<String> PULSAR_PRODUCER_NAME = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "producerName") + .stringType() + .noDefaultValue() + .withDescription( + "A producer name which would be displayed in the Pulsar's dashboard." + + " If no producer name was provided, we would use a Pulsar generated name instead."); + + public static final ConfigOption<Long> PULSAR_SEND_TIMEOUT_MS = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "sendTimeoutMs") + .longType() + .defaultValue(30000L) + .withDescription( + Description.builder() + .text("Message send timeout in ms.") + .text( + "If a message is not acknowledged by a server before the %s expires, an error occurs.", + code("sendTimeout")) + .build()); + + public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessages") + .intType() + .defaultValue(DEFAULT_MAX_PENDING_MESSAGES) + .withDescription( + Description.builder() + .text("The maximum size of a queue holding pending messages.") + .linebreak() + .text( + "For example, a message waiting to receive an acknowledgment from a %s.", + link( + "broker", + "https://pulsar.apache.org/docs/en/reference-terminology#broker")) + .linebreak() + .text( + "By default, when the queue is full, all calls to the %s and %s methods fail unless you set %s to true.", + code("Send"), + code("SendAsync"), + code("BlockIfQueueFull")) + .build()); + + public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessagesAcrossPartitions") + .intType() + .defaultValue(DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS) + .withDescription( + Description.builder() + .text( + "The maximum number of pending messages across partitions.") + .linebreak() + .text( + "Use the setting to lower the max pending messages for each partition (%s) if the total number exceeds the configured value.", + code("setMaxPendingMessages")) + .build()); + + public static final ConfigOption<Long> PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxPublishDelayMicros") + .longType() + .defaultValue(MILLISECONDS.toMicros(1)) + .withDescription("Batching time period of sending messages."); + + public static final ConfigOption<Integer> + PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY = + ConfigOptions.key( + PRODUCER_CONFIG_PREFIX + + "batchingPartitionSwitchFrequencyByPublishDelay") + .intType() + .defaultValue(10) + .withDescription( + "The maximum wait time for switching topic partitions."); + + public static final ConfigOption<Integer> PULSAR_BATCHING_MAX_MESSAGES = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxMessages") + .intType() + .defaultValue(DEFAULT_BATCHING_MAX_MESSAGES) + .withDescription("The maximum number of messages permitted in a batch."); + + public static final ConfigOption<Integer> PULSAR_BATCHING_MAX_BYTES = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxBytes") + .intType() + .defaultValue(128 * 1024) + .withDescription( + "The maximum size of messages permitted in a batch. Keep the maximum consistent as previous versions."); + + public static final ConfigOption<Boolean> PULSAR_BATCHING_ENABLED = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingEnabled") + .booleanType() + .defaultValue(true) + .withDescription("Enable batch send ability, it was enabled by default."); + + public static final ConfigOption<Boolean> PULSAR_CHUNKING_ENABLED = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "chunkingEnabled") + .booleanType() + .defaultValue(false) + .withDescription(""); + + public static final ConfigOption<CompressionType> PULSAR_COMPRESSION_TYPE = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "compressionType") + .enumType(CompressionType.class) + .defaultValue(CompressionType.NONE) + .withDescription( + Description.builder() + .text("Message data compression type used by a producer.") + .text("Available options:") + .list( + link("LZ4", "https://github.com/lz4/lz4"), + link("ZLIB", "https://zlib.net/"), + link("ZSTD", "https://facebook.github.io/zstd/"), + link("SNAPPY", "https://google.github.io/snappy/")) + .build()); + + public static final ConfigOption<Long> PULSAR_INITIAL_SEQUENCE_ID = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "initialSequenceId") + .longType() + .noDefaultValue() + .withDescription( + "The sequence id for avoiding the duplication, it's used when Pulsar doesn't have transaction."); + + public static final ConfigOption<Map<String, String>> PULSAR_PRODUCER_PROPERTIES = + ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "properties") + .mapType() + .defaultValue(emptyMap()) + .withDescription( + Description.builder() + .text("A name or value property of this consumer.") + .text( + " %s is application defined metadata attached to a consumer.", + code("properties")) + .text( + " When getting a topic stats, associate this metadata with the consumer stats for easier identification.") + .build()); +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java new file mode 100644 index 0000000..13821fe --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.config; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.pulsar.common.config.PulsarConfigValidator; + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; + +import java.util.Map; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_ENABLED; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_BYTES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_CHUNKING_ENABLED; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_COMPRESSION_TYPE; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_INITIAL_SEQUENCE_ID; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_PROPERTIES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS; +import static org.apache.pulsar.client.api.MessageRoutingMode.SinglePartition; +import static org.apache.pulsar.client.api.ProducerAccessMode.Shared; + +/** Create the {@link Producer} to send message and a validator for building sink config. */ +@Internal +public final class PulsarSinkConfigUtils { + + private PulsarSinkConfigUtils() { + // No need to create instance. + } + + public static final PulsarConfigValidator SINK_CONFIG_VALIDATOR = + PulsarConfigValidator.builder() + .requiredOption(PULSAR_SERVICE_URL) + .requiredOption(PULSAR_ADMIN_URL) + .conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP) + .build(); + + /** Create a pulsar producer builder by using the given Configuration. */ + public static <T> ProducerBuilder<T> createProducerBuilder( + PulsarClient client, Schema<T> schema, SinkConfiguration configuration) { + ProducerBuilder<T> builder = client.newProducer(schema); + + configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName); + configuration.useOption( + PULSAR_SEND_TIMEOUT_MS, + Math::toIntExact, + ms -> builder.sendTimeout(ms, MILLISECONDS)); + configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages); + configuration.useOption( + PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS, + builder::maxPendingMessagesAcrossPartitions); + configuration.useOption( + PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS, + s -> builder.batchingMaxPublishDelay(s, MICROSECONDS)); + configuration.useOption( + PULSAR_BATCHING_PARTITION_SWITCH_FREQUENCY_BY_PUBLISH_DELAY, + builder::roundRobinRouterBatchingPartitionSwitchFrequency); + configuration.useOption(PULSAR_BATCHING_MAX_MESSAGES, builder::batchingMaxMessages); + configuration.useOption(PULSAR_BATCHING_MAX_BYTES, builder::batchingMaxBytes); + configuration.useOption(PULSAR_BATCHING_ENABLED, builder::enableBatching); + configuration.useOption(PULSAR_CHUNKING_ENABLED, builder::enableChunking); + configuration.useOption(PULSAR_COMPRESSION_TYPE, builder::compressionType); + configuration.useOption(PULSAR_INITIAL_SEQUENCE_ID, builder::initialSequenceId); + + // Set producer properties + Map<String, String> properties = configuration.getProperties(PULSAR_PRODUCER_PROPERTIES); + if (!properties.isEmpty()) { + builder.properties(properties); + } + + // Set the default value for current producer builder. + // We use non-partitioned producer by default. This wouldn't be changed in the future. + builder.blockIfQueueFull(true) + .messageRoutingMode(SinglePartition) + .enableMultiSchema(false) + .autoUpdatePartitions(false) + .accessMode(Shared) + .enableLazyStartPartitionedProducers(false); + + return builder; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java new file mode 100644 index 0000000..e0ef7ff --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.Sink.InitContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration; + +import org.apache.pulsar.client.api.Schema; + +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT; + +/** The configured class for pulsar sink. */ +@PublicEvolving +public class SinkConfiguration extends PulsarConfiguration { + private static final long serialVersionUID = 4941360605051251153L; + + private final DeliveryGuarantee deliveryGuarantee; + private final long transactionTimeoutMillis; + private final long topicMetadataRefreshInterval; + private final int partitionSwitchSize; + private final boolean enableSchemaEvolution; + private final int maxPendingMessages; + private final int maxRecommitTimes; + + public SinkConfiguration(Configuration configuration) { + super(configuration); + + this.deliveryGuarantee = get(PULSAR_WRITE_DELIVERY_GUARANTEE); + this.transactionTimeoutMillis = getLong(PULSAR_WRITE_TRANSACTION_TIMEOUT); + this.topicMetadataRefreshInterval = getLong(PULSAR_TOPIC_METADATA_REFRESH_INTERVAL); + this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES); + this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION); + this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS); + this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES); + } + + /** The delivery guarantee changes the behavior of {@code PulsarWriter}. */ + public DeliveryGuarantee getDeliveryGuarantee() { + return deliveryGuarantee; + } + + /** + * Pulsar's transactions have a timeout mechanism for the uncommitted transaction. We use + * transactions for making sure the message could be written only once. Since the checkpoint + * interval couldn't be acquired from {@link InitContext}, we have to expose this option. Make + * sure this value is greater than the checkpoint interval. Create a pulsar producer builder by + * using the given Configuration. + */ + public long getTransactionTimeoutMillis() { + return transactionTimeoutMillis; + } + + /** + * Auto-update the topic metadata in a fixed interval (in ms). The default value is 30 minutes. + */ + public long getTopicMetadataRefreshInterval() { + return topicMetadataRefreshInterval; + } + + /** + * Switch the partition to write when we have written the given size of messages. It's used for + * a round-robin topic router. + */ + public int getPartitionSwitchSize() { + return partitionSwitchSize; + } + + /** + * If we should serialize and send the message with a specified Pulsar {@link Schema} instead + * the default {@link Schema#BYTES}. This switch is only used for {@code PulsarSchemaWrapper}. + */ + public boolean isEnableSchemaEvolution() { + return enableSchemaEvolution; + } + + /** + * Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a + * Pulsar writer instance. + */ + public int getMaxPendingMessages() { + return maxPendingMessages; + } + + /** The maximum allowed recommitting time for a Pulsar transaction. */ + public int getMaxRecommitTimes() { + return maxRecommitTimes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + SinkConfiguration that = (SinkConfiguration) o; + return transactionTimeoutMillis == that.transactionTimeoutMillis + && topicMetadataRefreshInterval == that.topicMetadataRefreshInterval + && partitionSwitchSize == that.partitionSwitchSize + && enableSchemaEvolution == that.enableSchemaEvolution + && maxPendingMessages == that.maxPendingMessages + && maxRecommitTimes == that.maxRecommitTimes; + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + transactionTimeoutMillis, + topicMetadataRefreshInterval, + partitionSwitchSize, + enableSchemaEvolution, + maxPendingMessages, + maxRecommitTimes); + } +}