syhily commented on a change in pull request #17452: URL: https://github.com/apache/flink/pull/17452#discussion_r805879925
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter; +import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; +import org.apache.flink.connector.pulsar.sink.writer.serializer.BytesSchema; +import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener; +import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; + +import static java.util.Collections.emptyList; +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible to write records in a Pulsar topic and to handle the different delivery + * {@link DeliveryGuarantee}s. + * + * @param <IN> The type of the input elements. + */ +@Internal +public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); + + private final SinkConfiguration sinkConfiguration; + private final PulsarSerializationSchema<IN> serializationSchema; + private final TopicMetadataListener metadataListener; + private final TopicRouter<IN> topicRouter; + private final Schema<IN> schema; + private final DeliveryGuarantee deliveryGuarantee; + private final PulsarSinkContextAdapter sinkContextAdapter; + private final MailboxExecutor mailboxExecutor; + private final TopicProducerRegister<IN> producerRegister; + private final Semaphore pendingMessages; + + /** + * Constructor creating a Pulsar writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)} + * fails. + * + * @param sinkConfiguration the configuration to configure the Pulsar producer. + * @param serializationSchema transform the incoming records into different message properties. + * @param metadataListener the listener for querying topic metadata. + * @param topicRouter topic router to choose topic by incoming records. + * @param initContext context to provide information about the runtime environment. + */ + public PulsarWriter( + SinkConfiguration sinkConfiguration, + PulsarSerializationSchema<IN> serializationSchema, + TopicMetadataListener metadataListener, + TopicRouter<IN> topicRouter, + InitContext initContext) { + this.sinkConfiguration = checkNotNull(sinkConfiguration); + this.serializationSchema = checkNotNull(serializationSchema); + this.metadataListener = checkNotNull(metadataListener); + this.topicRouter = checkNotNull(topicRouter); + checkNotNull(initContext); + + if (sinkConfiguration.isEnableSchemaEvolution()) { + this.schema = serializationSchema.schema(); + } else { + this.schema = new BytesSchema<>(serializationSchema); + } + + this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee(); + this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, sinkConfiguration); + this.mailboxExecutor = initContext.getMailboxExecutor(); + + // Initialize topic metadata listener. + LOG.debug("Initialize topic metadata after creating Pulsar writer."); + ProcessingTimeService timeService = initContext.getProcessingTimeService(); + this.metadataListener.open(sinkConfiguration, timeService); + + // Initialize topic router. + this.topicRouter.open(sinkConfiguration); + + // Initialize the serialization schema. + try { + InitializationContext initializationContext = + initContext.asSerializationSchemaInitializationContext(); + this.serializationSchema.open( + initializationContext, sinkContextAdapter, sinkConfiguration); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot initialize schema.", e); + } + + // Create this producer register after opening serialization schema! + this.producerRegister = new TopicProducerRegister<>(sinkConfiguration); + this.pendingMessages = new Semaphore(sinkConfiguration.getMaxPendingMessages()); + } + + @Override + public void write(IN element, Context context) throws IOException, InterruptedException { + // Serialize the incoming element. + sinkContextAdapter.updateTimestamp(context); + + // Choose the right topic to send. + List<String> availableTopics = metadataListener.availableTopics(); + String topic = topicRouter.route(element, availableTopics, sinkContextAdapter); + + // Create message builder for sending message. + TypedMessageBuilder<IN> builder = createMessageBuilder(topic, element, sinkContextAdapter); + + // Perform message sending. + if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // We would just ignore the sending exception. This may cause data loss. + builder.sendAsync(); + } else { + // Waiting for permits to write message. + pendingMessages.acquire(); + CompletableFuture<MessageId> sender = builder.sendAsync(); + sender.whenComplete( + (id, ex) -> { + pendingMessages.release(); + if (ex != null) { + mailboxExecutor.execute( + () -> { + throw new FlinkRuntimeException( + "Failed to send data to Pulsar " + topic, ex); + }, + "Failed to send message to Pulsar"); + } else { + LOG.debug("Sent message to Pulsar {} with message id {}", topic, id); + } + }); + } + } + + private TypedMessageBuilder<IN> createMessageBuilder( + String topic, IN element, PulsarSinkContextAdapter context) { + TypedMessageBuilder<IN> builder = + producerRegister.createMessageBuilder(topic, element, schema); + + byte[] orderingKey = serializationSchema.orderingKey(element, context); + if (orderingKey != null) { + builder.orderingKey(orderingKey); + } + + String key = serializationSchema.key(element, context); + if (!Strings.isNullOrEmpty(key)) { + builder.key(key); + } + + Map<String, String> properties = serializationSchema.properties(element, context); + if (properties != null) { + builder.properties(properties); + } + + Long timestamp = serializationSchema.eventTime(element, context); + if (timestamp != null) { + builder.eventTime(timestamp); + } + + List<String> clusters = serializationSchema.replicationClusters(element, context); + if (clusters != null) { + builder.replicationClusters(clusters); + } + + if (serializationSchema.disableReplication(element, context)) { + builder.disableReplication(); + } + + return builder; + } + + @Override + public void flush(boolean endOfInput) throws IOException { + if (endOfInput) { + // Try flush only once when we meet the end of the input. + producerRegister.flush(); + } else { + while (pendingMessages.availablePermits() < sinkConfiguration.getMaxPendingMessages()) { Review comment: Let me explain this in detail. `pendingMessages` is created by using `new Semaphore(sinkConfiguration.getMaxPendingMessages())`, the available permits would only be equal to or less than the maximum. We would block the message writing by `pendingMessages.acquire()` until we finally have a permit to write a message. We would trigger `flush(false)` before doing the checkpoint. The `pendingMessages.availablePermits() == sinkConfiguration.getMaxPendingMessages()` means that there is on pending message. Because we would return permits when the message writing is complete in the mailbox thread by `pendingMessages.release()`. I think the confusion for you is just that the `Semaphore` is not commonly used in Java. XD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
