fapaul commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r741733756



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState;
+import 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * a pulsar Sink implement.
+ *
+ * @param <IN> record data type.
+ */
+@PublicEvolving
+public class PulsarSink<IN> implements Sink<IN, PulsarSinkCommittable, 
PulsarWriterState, Void> {
+
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+    private final PartitionSelector<IN> partitionSelector;
+
+    private final Configuration configuration;
+
+    public PulsarSink(

Review comment:
       The constructor can probably be package-private

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/IncompatibleSchemaException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.schema;
+
+/** Exception designates the incompatibility between pulsar and flink type. */
+public class IncompatibleSchemaException extends RuntimeException {
+
+    public IncompatibleSchemaException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    public IncompatibleSchemaException(String message) {

Review comment:
       ctor is unused?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Committer implementation for {@link 
org.apache.flink.connector.pulsar.sink.PulsarSink}. */
+@Internal
+public class PulsarCommitter implements Committer<PulsarSinkCommittable>, 
Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarCommitter.class);
+
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    private final Configuration configuration;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarCommitter(DeliveryGuarantee deliveryGuarantee, Configuration 
configuration) {
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> commit(List<PulsarSinkCommittable> 
committables)
+            throws IOException {
+        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE || 
committables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        final TransactionCoordinatorClientImpl tcClient = getTcClient();
+        LOG.debug("commit committables, current tcClient state: {}", 
tcClient.getState());
+        List<PulsarSinkCommittable> recoveryCommittables = new ArrayList<>();
+        for (PulsarSinkCommittable committable : committables) {
+            LOG.debug("commit committables {}", committable.getTxnID());
+            try {
+                tcClient.commit(committable.getTxnID());
+            } catch (TransactionCoordinatorClientException e) {
+                LOG.error("commit committables failed " + 
committable.getTxnID(), e);
+                recoveryCommittables.add(committable);
+            }
+        }
+        return recoveryCommittables;
+    }
+
+    private synchronized TransactionCoordinatorClientImpl getTcClient()
+            throws TransactionCoordinatorClientException {

Review comment:
       The exception seems to be unnecessary

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {

Review comment:
       Can you double-check the fields of this class some of them can be final 
and some of them are probably not needed at all?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Committer implementation for {@link 
org.apache.flink.connector.pulsar.sink.PulsarSink}. */
+@Internal
+public class PulsarCommitter implements Committer<PulsarSinkCommittable>, 
Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarCommitter.class);
+
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    private final Configuration configuration;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarCommitter(DeliveryGuarantee deliveryGuarantee, Configuration 
configuration) {
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> commit(List<PulsarSinkCommittable> 
committables)
+            throws IOException {
+        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE || 
committables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        final TransactionCoordinatorClientImpl tcClient = getTcClient();
+        LOG.debug("commit committables, current tcClient state: {}", 
tcClient.getState());
+        List<PulsarSinkCommittable> recoveryCommittables = new ArrayList<>();
+        for (PulsarSinkCommittable committable : committables) {
+            LOG.debug("commit committables {}", committable.getTxnID());
+            try {
+                tcClient.commit(committable.getTxnID());
+            } catch (TransactionCoordinatorClientException e) {
+                LOG.error("commit committables failed " + 
committable.getTxnID(), e);
+                recoveryCommittables.add(committable);
+            }
+        }
+        return recoveryCommittables;
+    }
+
+    private synchronized TransactionCoordinatorClientImpl getTcClient()

Review comment:
       Why is this method synchronized? AFAIK commit is never called parallel

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return userCodeClassLoader;
+                        }
+                    });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            this.delivery = new ExactlyOnceDelivery(sinkConfiguration, 
this::getPulsarClient);
+        } else {
+            this.delivery = new AtLeastOnceDelivery();
+        }
+        closer.register(delivery);
+        this.sendCallback =
+                initializeSendCallback(sinkConfiguration, 
sinkInitContext.getMailboxExecutor());
+    }
+
+    @Override
+    public void write(IN value, Context context) throws IOException {
+        String topic = topicSelector.selector(value);
+        TypedMessageBuilder messageBuilder = 
delivery.newMessage(getProducer(topic));
+        serializationSchema.serialize(value, messageBuilder);
+
+        CompletableFuture<MessageId> messageIdFuture = 
messageBuilder.sendAsync();
+        pendingRecords.incrementAndGet();
+        futures.add(messageIdFuture);
+        messageIdFuture.whenComplete(sendCallback);
+    }
+
+    public void initializeState(List<PulsarWriterState> states) throws 
IOException {
+        checkNotNull(states, "The retrieved state was null.");
+        for (PulsarWriterState state : states) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Restoring: {}", state);
+            }
+        }
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+        if (!flush) {
+            return Collections.emptyList();
+        }
+        producerFlush();
+        List<PulsarSinkCommittable> committables = 
delivery.prepareCommit(flush);
+        LOG.debug("Committing {} committables, final commit={}.", 
committables, flush);
+        return committables;
+    }
+
+    @Override
+    public List<PulsarWriterState> snapshotState(long checkpointId) throws 
IOException {
+        return delivery.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        producerFlush();
+        closer.close();
+    }
+
+    public void producerFlush() throws IOException {
+        for (Producer<?> p : topic2Producer.values()) {
+            p.flush();
+        }
+
+        for (CompletableFuture<MessageId> completableFuture : futures) {
+            try {
+                completableFuture.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        futures.clear();
+
+        if (pendingRecords.get() > 0) {
+            throw new RuntimeException("The message could not be sent");
+        }
+    }
+
+    // ------------------------------internal 
method------------------------------
+
+    private synchronized PulsarClientImpl getPulsarClient() {
+        if (pulsarClient != null) {
+            return pulsarClient;
+        }
+        pulsarClient = (PulsarClientImpl) 
PulsarConfigUtils.createClient(configuration);
+        closer.register(pulsarClient);
+        return pulsarClient;
+    }
+
+    protected Producer<?> getProducer(String topic) throws 
PulsarClientException {
+        Producer<?> producer = topic2Producer.get(topic);
+        if (producer != null && producer.isConnected()) {
+            return producer;
+        }
+
+        final ProducerBuilder<?> producerBuilder =
+                PulsarSinkConfigUtils.createProducerBuilder(
+                        getPulsarClient(), serializationSchema.getSchema(), 
configuration);
+        producerBuilder.topic(topic);
+
+        if (partitionSelector != null) {
+            producerBuilder.messageRouter(generateMessageRouter());
+        }
+        producer = producerBuilder.create();
+        closer.register(producer);
+        uploadSchema(topic);
+        topic2Producer.put(topic, producer);
+        return producer;
+    }
+
+    @SuppressWarnings("unchecked")
+    private MessageRouter generateMessageRouter() {
+        return new MessageRouter() {
+            @Override
+            public int choosePartition(Message<?> msg, TopicMetadata metadata) 
{
+                return partitionSelector.select(
+                        (Message<IN>) msg,
+                        new 
org.apache.flink.connector.pulsar.sink.writer.selector.TopicMetadata(
+                                metadata.numPartitions()));
+            }
+        };
+    }
+
+    protected BiConsumer<MessageId, Throwable> initializeSendCallback(
+            SinkConfiguration sinkConfiguration, MailboxExecutor 
mailboxExecutor) {
+        if (sinkConfiguration.isFailOnWrite()) {
+            return (t, u) -> {
+                if (u == null) {
+                    acknowledgeMessage();
+                    return;
+                }
+                mailboxExecutor.execute(
+                        () -> {
+                            throw new FlinkRuntimeException(u);
+                        },
+                        "Failed to send data to Pulsar");
+            };
+        } else {
+            return (t, u) -> {
+                if (u != null) {
+                    LOG.error(
+                            "Error while sending message to Pulsar: {}",
+                            ExceptionUtils.stringifyException(u));
+                }
+                acknowledgeMessage();
+            };
+        }
+    }
+
+    private void acknowledgeMessage() {
+        synchronized (pendingRecords) {
+            if (pendingRecords.decrementAndGet() == 0L) {
+                pendingRecords.notifyAll();
+            }
+        }
+    }

Review comment:
       Can you explain this block? In general, synchronization should not be 
necessary if everything is executed in the mailbox.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState;
+import 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * a pulsar Sink implement.

Review comment:
       Can you add more information about the capabilities of the sink? You can 
have a look at the KafkaSink for a reference :)

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriterState;
+import 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriterStateSerializer;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * a pulsar Sink implement.
+ *
+ * @param <IN> record data type.
+ */
+@PublicEvolving
+public class PulsarSink<IN> implements Sink<IN, PulsarSinkCommittable, 
PulsarWriterState, Void> {
+
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+    private final PartitionSelector<IN> partitionSelector;
+
+    private final Configuration configuration;
+
+    public PulsarSink(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration) {
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;

Review comment:
       We usually use `Preconditions.checkNotNull()` for to ensure ctor 
parameters are not null

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
+
+/** the options for Pulsar Sink. */
+@PublicEvolving
+@ConfigGroups(
+        groups = {
+            @ConfigGroup(name = "PulsarSink", keyPrefix = SINK_CONFIG_PREFIX),
+            @ConfigGroup(name = "PulsarProducer", keyPrefix = 
PRODUCER_CONFIG_PREFIX)
+        })
+public final class PulsarSinkOptions {
+
+    // Pulsar sink connector config prefix.
+    public static final String SINK_CONFIG_PREFIX = "pulsar.sink.";
+    // Pulsar producer API config prefix.
+    public static final String PRODUCER_CONFIG_PREFIX = "pulsar.producer.";

Review comment:
       Can both be private?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/IncompatibleSchemaException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.schema;
+
+/** Exception designates the incompatibility between pulsar and flink type. */
+public class IncompatibleSchemaException extends RuntimeException {

Review comment:
       Please mark as `@Internal`

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Committer implementation for {@link 
org.apache.flink.connector.pulsar.sink.PulsarSink}. */
+@Internal
+public class PulsarCommitter implements Committer<PulsarSinkCommittable>, 
Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarCommitter.class);
+
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    private final Configuration configuration;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarCommitter(DeliveryGuarantee deliveryGuarantee, Configuration 
configuration) {
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> commit(List<PulsarSinkCommittable> 
committables)
+            throws IOException {
+        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE || 
committables.isEmpty()) {

Review comment:
       We should not forward any committables from the sink writer if 
`deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE` currently they are always 
written to the committer state.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_ROUTING_MODE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS;
+import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * PulsarSink's builder is used to simplify the creation of Sink.

Review comment:
       Can you add an example here how to use the builder to construct a pulsar 
sink?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_ROUTING_MODE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS;
+import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * PulsarSink's builder is used to simplify the creation of Sink.
+ *
+ * @param <IN>
+ */
+public class PulsarSinkBuilder<IN> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSinkBuilder.class);
+
+    private DeliveryGuarantee deliveryGuarantee;
+    private TopicSelector<IN> topicSelector;
+    private PulsarSerializationSchema<IN, ?> serializationSchema;
+    private PartitionSelector<IN> partitionSelector;
+    private final Configuration configuration;
+
+    // private builder constructor.
+    PulsarSinkBuilder() {
+        this.configuration = new Configuration();
+        this.deliveryGuarantee = DeliveryGuarantee.EXACTLY_ONCE;
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) {
+        return setConfig(PULSAR_ADMIN_URL, adminUrl);
+    }
+
+    /**
+     * Sets the server's link for the PulsarProducer of the PulsarSink.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setServiceUrl(String serviceUrl) {
+        return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+    }
+
+    /**
+     * Set the topic for the PulsarSink.
+     *
+     * @param topic pulsar topic
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setTopic(String topic) {
+        this.topicSelector = e -> topic;
+        return this;
+    }
+
+    /**
+     * set a topic selector for the PulsarSink.
+     *
+     * @param topicSelector select a topic by record.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setTopic(TopicSelector<IN> topicSelector) {
+        this.topicSelector = topicSelector;
+        return this;
+    }
+
+    /**
+     * Set a DeliverGuarantees for the PulsarSink.
+     *
+     * @param deliveryGuarantee deliver guarantees.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+        this.deliveryGuarantee = deliveryGuarantee;
+        return this;
+    }
+
+    /**
+     * SerializationSchema is required for getting the {@link Schema} for 
serialize message from
+     * pulsar and getting the {@link TypeInformation} for message 
serialization in flink.
+     *
+     * <p>We have defined a set of implementations, using {@code
+     * PulsarSerializationSchema#pulsarSchema} or {@code 
PulsarSerializationSchema#flinkSchema} for
+     * creating the desired schema.
+     */
+    public <T extends IN> PulsarSinkBuilder<T> setSerializationSchema(
+            PulsarSerializationSchema<T, ?> serializationSchema) {
+        PulsarSinkBuilder<T> self = specialized();
+        self.serializationSchema = serializationSchema;
+        return self;
+    }
+
+    /**
+     * Write the message to a partition of the Pulsar topic using a random 
method.
+     *
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> singlePartition() {
+        return setConfig(PULSAR_MESSAGE_ROUTING_MODE, 
MessageRoutingMode.SinglePartition);
+    }
+
+    /**
+     * Write the message to a partition of the Pulsar topic using a round 
Robin method.
+     *
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> roundRobinPartition() {
+        return setConfig(PULSAR_MESSAGE_ROUTING_MODE, 
MessageRoutingMode.RoundRobinPartition);
+    }
+
+    /**
+     * Write the message to a partition of the Pulsar topic using user defined 
method.
+     *
+     * @return this PulsarSinkBuilder.
+     */
+    public <T extends IN> PulsarSinkBuilder<T> customPartition(
+            PartitionSelector<T> partitionSelector) {
+        setConfig(PULSAR_MESSAGE_ROUTING_MODE, 
MessageRoutingMode.CustomPartition);
+        PulsarSinkBuilder<T> self = specialized();
+        self.partitionSelector = partitionSelector;
+        return self;
+    }
+
+    /**
+     * Build the {@link PulsarSink}.
+     *
+     * @return a PulsarSink with the settings made for this builder.
+     */
+    @SuppressWarnings("java:S3776")
+    public PulsarSink<IN> build() {
+        // Check builder configuration.
+        checkConfigurations(configuration);
+
+        // Ensure the topic  for pulsar.
+        checkNotNull(topicSelector, "No topic names or topic pattern are 
provided.");
+
+        if (DeliveryGuarantee.EXACTLY_ONCE == deliveryGuarantee) {
+            configuration.set(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true);
+            configuration.set(PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS, 0L);
+            if (!configuration.contains(PULSAR_TRANSACTION_TIMEOUT_MILLIS)) {
+                LOG.warn(
+                        "The default pulsar transaction timeout is 3 hours, "
+                                + "make sure it was greater than your 
checkpoint interval.");
+            } else {
+                Long timeout = 
configuration.get(PULSAR_TRANSACTION_TIMEOUT_MILLIS);
+                LOG.warn(
+                        "The configured transaction timeout is {} mille 
seconds, "

Review comment:
       ```suggestion
                           "The configured transaction timeout is {} 
milliseconds, "
   ```

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_FAIL_ON_WRITE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS;
+
+/** The configure class for pulsar sink. */
+@PublicEvolving

Review comment:
       Why is this class marked as `@PublicEvolving`?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;

Review comment:
       Remove

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;

Review comment:
       Why are these fields transient?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_FAIL_ON_WRITE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS;
+
+/** The configure class for pulsar sink. */
+@PublicEvolving
+public class SinkConfiguration implements Serializable {

Review comment:
       Does it need to be serializable?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;

Review comment:
       You can get the metric group from the `Sink.InitContext` and you can 
pass it to the pulsar writer as a reference please have a look at the KafkaSink

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.List;
+import java.util.Set;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.setOptionValue;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_ENABLED;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BLOCK_IF_QUEUE_FULL;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_COMPRESSION_TYPE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_CRYPTO_FAILURE_ACTION;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_ENABLE_CHUNKING;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_HASHING_SCHEME;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_ROUTING_MODE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_NAME;
+
+/** Create source related {@link Consumer} and validate config. */
+@Internal
+public final class PulsarSinkConfigUtils {
+
+    private PulsarSinkConfigUtils() {
+        // No need to create instance.
+    }
+
+    private static final List<Set<ConfigOption<?>>> CONFLICT_SINK_OPTIONS =
+            ImmutableList.<Set<ConfigOption<?>>>builder()
+                    .add(ImmutableSet.of(PULSAR_AUTH_PARAMS, 
PULSAR_AUTH_PARAM_MAP))
+                    .build();
+
+    private static final Set<ConfigOption<?>> REQUIRED_SINK_OPTIONS =
+            ImmutableSet.<ConfigOption<?>>builder()
+                    .add(PULSAR_SERVICE_URL)
+                    .add(PULSAR_ADMIN_URL)
+                    .build();
+
+    /**
+     * Helper method for checking client related config options. We would 
validate:
+     *
+     * <ul>
+     *   <li>If user have provided the required client config options.
+     *   <li>If user have provided some conflict options.
+     * </ul>
+     */
+    public static void checkConfigurations(Configuration configuration) {
+        REQUIRED_SINK_OPTIONS.forEach(
+                option ->
+                        Preconditions.checkArgument(
+                                configuration.contains(option),
+                                "Config option %s is not provided for pulsar 
source.",
+                                option));
+
+        CONFLICT_SINK_OPTIONS.forEach(
+                options -> {
+                    long nums = 
options.stream().filter(configuration::contains).count();
+                    Preconditions.checkArgument(
+                            nums <= 1,
+                            "Conflict config options %s were provided, we only 
support one of them for creating pulsar source.",
+                            options);
+                });
+    }
+
+    /** Create a pulsar consumer builder by using the given Configuration. */
+    public static <T> ProducerBuilder<T> createProducerBuilder(
+            PulsarClient client, Schema<T> schema, Configuration 
configuration) {
+        ProducerBuilder<T> builder = client.newProducer(schema);
+
+        setOptionValue(configuration, PULSAR_TOPIC_NAME, builder::topic);
+        setOptionValue(configuration, PULSAR_PRODUCER_NAME, 
builder::producerName);
+        setOptionValue(
+                configuration,
+                PULSAR_SEND_TIMEOUT_MS,
+                v -> builder.sendTimeout(v.intValue(), MILLISECONDS));
+        setOptionValue(configuration, PULSAR_BLOCK_IF_QUEUE_FULL, 
builder::blockIfQueueFull);
+        setOptionValue(configuration, PULSAR_MAX_PENDING_MESSAGES, 
builder::maxPendingMessages);
+        setOptionValue(
+                configuration,
+                PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
+                builder::maxPendingMessagesAcrossPartitions);
+        setOptionValue(configuration, PULSAR_MESSAGE_ROUTING_MODE, 
builder::messageRoutingMode);
+        setOptionValue(configuration, PULSAR_HASHING_SCHEME, 
builder::hashingScheme);

Review comment:
       Can we maybe share the configurations between the source and the sink?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return userCodeClassLoader;
+                        }
+                    });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            this.delivery = new ExactlyOnceDelivery(sinkConfiguration, 
this::getPulsarClient);
+        } else {
+            this.delivery = new AtLeastOnceDelivery();
+        }
+        closer.register(delivery);
+        this.sendCallback =
+                initializeSendCallback(sinkConfiguration, 
sinkInitContext.getMailboxExecutor());
+    }
+
+    @Override
+    public void write(IN value, Context context) throws IOException {
+        String topic = topicSelector.selector(value);
+        TypedMessageBuilder messageBuilder = 
delivery.newMessage(getProducer(topic));
+        serializationSchema.serialize(value, messageBuilder);
+
+        CompletableFuture<MessageId> messageIdFuture = 
messageBuilder.sendAsync();
+        pendingRecords.incrementAndGet();
+        futures.add(messageIdFuture);
+        messageIdFuture.whenComplete(sendCallback);
+    }
+
+    public void initializeState(List<PulsarWriterState> states) throws 
IOException {
+        checkNotNull(states, "The retrieved state was null.");
+        for (PulsarWriterState state : states) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Restoring: {}", state);
+            }
+        }
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+        if (!flush) {
+            return Collections.emptyList();
+        }
+        producerFlush();
+        List<PulsarSinkCommittable> committables = 
delivery.prepareCommit(flush);
+        LOG.debug("Committing {} committables, final commit={}.", 
committables, flush);
+        return committables;
+    }
+
+    @Override
+    public List<PulsarWriterState> snapshotState(long checkpointId) throws 
IOException {
+        return delivery.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        producerFlush();
+        closer.close();
+    }
+
+    public void producerFlush() throws IOException {

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;

Review comment:
       final

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return userCodeClassLoader;
+                        }
+                    });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            this.delivery = new ExactlyOnceDelivery(sinkConfiguration, 
this::getPulsarClient);
+        } else {
+            this.delivery = new AtLeastOnceDelivery();
+        }
+        closer.register(delivery);
+        this.sendCallback =
+                initializeSendCallback(sinkConfiguration, 
sinkInitContext.getMailboxExecutor());
+    }
+
+    @Override
+    public void write(IN value, Context context) throws IOException {
+        String topic = topicSelector.selector(value);
+        TypedMessageBuilder messageBuilder = 
delivery.newMessage(getProducer(topic));
+        serializationSchema.serialize(value, messageBuilder);
+
+        CompletableFuture<MessageId> messageIdFuture = 
messageBuilder.sendAsync();
+        pendingRecords.incrementAndGet();
+        futures.add(messageIdFuture);
+        messageIdFuture.whenComplete(sendCallback);
+    }
+
+    public void initializeState(List<PulsarWriterState> states) throws 
IOException {
+        checkNotNull(states, "The retrieved state was null.");
+        for (PulsarWriterState state : states) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Restoring: {}", state);
+            }
+        }
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+        if (!flush) {
+            return Collections.emptyList();
+        }
+        producerFlush();
+        List<PulsarSinkCommittable> committables = 
delivery.prepareCommit(flush);
+        LOG.debug("Committing {} committables, final commit={}.", 
committables, flush);
+        return committables;
+    }
+
+    @Override
+    public List<PulsarWriterState> snapshotState(long checkpointId) throws 
IOException {
+        return delivery.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        producerFlush();
+        closer.close();
+    }
+
+    public void producerFlush() throws IOException {
+        for (Producer<?> p : topic2Producer.values()) {
+            p.flush();
+        }
+
+        for (CompletableFuture<MessageId> completableFuture : futures) {
+            try {
+                completableFuture.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        futures.clear();
+
+        if (pendingRecords.get() > 0) {
+            throw new RuntimeException("The message could not be sent");
+        }
+    }
+
+    // ------------------------------internal 
method------------------------------
+
+    private synchronized PulsarClientImpl getPulsarClient() {
+        if (pulsarClient != null) {
+            return pulsarClient;
+        }
+        pulsarClient = (PulsarClientImpl) 
PulsarConfigUtils.createClient(configuration);
+        closer.register(pulsarClient);
+        return pulsarClient;
+    }
+
+    protected Producer<?> getProducer(String topic) throws 
PulsarClientException {

Review comment:
       Why protected?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return userCodeClassLoader;
+                        }
+                    });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            this.delivery = new ExactlyOnceDelivery(sinkConfiguration, 
this::getPulsarClient);
+        } else {
+            this.delivery = new AtLeastOnceDelivery();
+        }
+        closer.register(delivery);
+        this.sendCallback =
+                initializeSendCallback(sinkConfiguration, 
sinkInitContext.getMailboxExecutor());
+    }
+
+    @Override
+    public void write(IN value, Context context) throws IOException {
+        String topic = topicSelector.selector(value);
+        TypedMessageBuilder messageBuilder = 
delivery.newMessage(getProducer(topic));
+        serializationSchema.serialize(value, messageBuilder);
+
+        CompletableFuture<MessageId> messageIdFuture = 
messageBuilder.sendAsync();
+        pendingRecords.incrementAndGet();
+        futures.add(messageIdFuture);
+        messageIdFuture.whenComplete(sendCallback);
+    }
+
+    public void initializeState(List<PulsarWriterState> states) throws 
IOException {
+        checkNotNull(states, "The retrieved state was null.");
+        for (PulsarWriterState state : states) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Restoring: {}", state);

Review comment:
       This feels incomplete

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return userCodeClassLoader;
+                        }
+                    });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            this.delivery = new ExactlyOnceDelivery(sinkConfiguration, 
this::getPulsarClient);
+        } else {
+            this.delivery = new AtLeastOnceDelivery();
+        }
+        closer.register(delivery);
+        this.sendCallback =
+                initializeSendCallback(sinkConfiguration, 
sinkInitContext.getMailboxExecutor());
+    }
+
+    @Override
+    public void write(IN value, Context context) throws IOException {
+        String topic = topicSelector.selector(value);
+        TypedMessageBuilder messageBuilder = 
delivery.newMessage(getProducer(topic));
+        serializationSchema.serialize(value, messageBuilder);
+
+        CompletableFuture<MessageId> messageIdFuture = 
messageBuilder.sendAsync();
+        pendingRecords.incrementAndGet();
+        futures.add(messageIdFuture);
+        messageIdFuture.whenComplete(sendCallback);

Review comment:
       Please take look at the `AsyncSinkWriter` how it uses the mailbox to 
enqueue asynchronous tasks. By using the mailbox you can get rid of all the 
manual synchronization because the mailbox executes the writer and you 
callbacks in the same thread.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_FAIL_ON_WRITE;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS;
+
+/** The configure class for pulsar sink. */
+@PublicEvolving
+public class SinkConfiguration implements Serializable {

Review comment:
       You can probably also remote this class completely and only pass the 
`failOnWrite` and `transactionTimeout` to the downstream class.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils;
+import 
org.apache.flink.connector.pulsar.common.schema.IncompatibleSchemaException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkCommittable;
+import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.AtLeastOnceDelivery;
+import org.apache.flink.connector.pulsar.sink.writer.delivery.Delivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.delivery.ExactlyOnceDelivery;
+import 
org.apache.flink.connector.pulsar.sink.writer.selector.PartitionSelector;
+import org.apache.flink.connector.pulsar.sink.writer.selector.TopicSelector;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * a pulsar SinkWriter implement.
+ *
+ * @param <IN> record data type.
+ */
+@Internal
+public class PulsarWriter<IN> implements SinkWriter<IN, PulsarSinkCommittable, 
PulsarWriterState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private transient Map<String, Producer<?>> topic2Producer;
+
+    private transient PulsarAdmin admin;
+
+    private transient BiConsumer<MessageId, Throwable> sendCallback;
+    private final AtomicLong pendingRecords = new AtomicLong();
+    //    private ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
+    //
+    //    private ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> 
tid2FuturesMap;
+
+    //    private Transaction currentTransaction;
+
+    private List<CompletableFuture<MessageId>> futures;
+    //    private ConcurrentHashMap<PulsarWriterState, 
List<CompletableFuture<MessageId>>>
+    // tid2FuturesMap;
+    // ---------------- //
+    private final Configuration configuration;
+    private final SinkConfiguration sinkConfiguration;
+    private final TopicSelector<IN> topicSelector;
+    private final PulsarSerializationSchema<IN, ?> serializationSchema;
+
+    private final PartitionSelector<IN> partitionSelector;
+    private final UserCodeClassLoader userCodeClassLoader;
+
+    private final Delivery delivery;
+
+    private transient PulsarClientImpl pulsarClient;
+
+    private final Closer closer = Closer.create();
+
+    public PulsarWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            TopicSelector<IN> topicSelector,
+            PulsarSerializationSchema<IN, ?> serializationSchema,
+            PartitionSelector<IN> partitionSelector,
+            Configuration configuration,
+            Sink.InitContext sinkInitContext) {
+        this.topicSelector = topicSelector;
+        this.serializationSchema = serializationSchema;
+        this.sinkConfiguration = new SinkConfiguration(configuration);
+        this.partitionSelector = partitionSelector;
+        this.configuration = configuration;
+        this.userCodeClassLoader = sinkInitContext.getUserCodeClassLoader();
+        this.topic2Producer = new HashMap<>();
+
+        this.futures = Collections.synchronizedList(new ArrayList<>());
+        try {
+            admin = PulsarConfigUtils.createAdmin(configuration);
+            closer.register(admin);
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return null;
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return userCodeClassLoader;
+                        }
+                    });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (deliveryGuarantee == EXACTLY_ONCE) {
+            this.delivery = new ExactlyOnceDelivery(sinkConfiguration, 
this::getPulsarClient);
+        } else {
+            this.delivery = new AtLeastOnceDelivery();
+        }
+        closer.register(delivery);
+        this.sendCallback =
+                initializeSendCallback(sinkConfiguration, 
sinkInitContext.getMailboxExecutor());
+    }
+
+    @Override
+    public void write(IN value, Context context) throws IOException {
+        String topic = topicSelector.selector(value);
+        TypedMessageBuilder messageBuilder = 
delivery.newMessage(getProducer(topic));
+        serializationSchema.serialize(value, messageBuilder);
+
+        CompletableFuture<MessageId> messageIdFuture = 
messageBuilder.sendAsync();
+        pendingRecords.incrementAndGet();
+        futures.add(messageIdFuture);
+        messageIdFuture.whenComplete(sendCallback);
+    }
+
+    public void initializeState(List<PulsarWriterState> states) throws 
IOException {
+        checkNotNull(states, "The retrieved state was null.");
+        for (PulsarWriterState state : states) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Restoring: {}", state);
+            }
+        }
+    }
+
+    @Override
+    public List<PulsarSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+        if (!flush) {
+            return Collections.emptyList();
+        }
+        producerFlush();
+        List<PulsarSinkCommittable> committables = 
delivery.prepareCommit(flush);
+        LOG.debug("Committing {} committables, final commit={}.", 
committables, flush);
+        return committables;
+    }
+
+    @Override
+    public List<PulsarWriterState> snapshotState(long checkpointId) throws 
IOException {
+        return delivery.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        producerFlush();
+        closer.close();
+    }
+
+    public void producerFlush() throws IOException {
+        for (Producer<?> p : topic2Producer.values()) {
+            p.flush();
+        }
+
+        for (CompletableFuture<MessageId> completableFuture : futures) {
+            try {
+                completableFuture.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        futures.clear();
+
+        if (pendingRecords.get() > 0) {
+            throw new RuntimeException("The message could not be sent");
+        }
+    }
+
+    // ------------------------------internal 
method------------------------------
+
+    private synchronized PulsarClientImpl getPulsarClient() {
+        if (pulsarClient != null) {
+            return pulsarClient;
+        }
+        pulsarClient = (PulsarClientImpl) 
PulsarConfigUtils.createClient(configuration);
+        closer.register(pulsarClient);
+        return pulsarClient;
+    }
+
+    protected Producer<?> getProducer(String topic) throws 
PulsarClientException {
+        Producer<?> producer = topic2Producer.get(topic);
+        if (producer != null && producer.isConnected()) {
+            return producer;
+        }
+
+        final ProducerBuilder<?> producerBuilder =
+                PulsarSinkConfigUtils.createProducerBuilder(
+                        getPulsarClient(), serializationSchema.getSchema(), 
configuration);
+        producerBuilder.topic(topic);
+
+        if (partitionSelector != null) {
+            producerBuilder.messageRouter(generateMessageRouter());
+        }
+        producer = producerBuilder.create();
+        closer.register(producer);
+        uploadSchema(topic);
+        topic2Producer.put(topic, producer);
+        return producer;
+    }
+
+    @SuppressWarnings("unchecked")
+    private MessageRouter generateMessageRouter() {
+        return new MessageRouter() {
+            @Override
+            public int choosePartition(Message<?> msg, TopicMetadata metadata) 
{
+                return partitionSelector.select(
+                        (Message<IN>) msg,
+                        new 
org.apache.flink.connector.pulsar.sink.writer.selector.TopicMetadata(
+                                metadata.numPartitions()));
+            }
+        };
+    }
+
+    protected BiConsumer<MessageId, Throwable> initializeSendCallback(

Review comment:
       protected?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to