fapaul commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r798579396



##########
File path: 
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
##########
@@ -113,6 +113,11 @@ 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(org.apache.flink.ap
 org.apache.flink.connector.kafka.sink.KafkaSink.getCommittableSerializer(): 
Returned leaf type org.apache.flink.connector.kafka.sink.KafkaCommittable does 
not satisfy: reside outside of package 'org.apache.flink..' or annotated with 
@Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.kafka.sink.KafkaSink.getWriterStateSerializer(): 
Returned leaf type org.apache.flink.connector.kafka.sink.KafkaWriterState does 
not satisfy: reside outside of package 'org.apache.flink..' or annotated with 
@Public or annotated with @PublicEvolving or annotated with @Deprecated
 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.getPartitionOffsets(java.util.Collection,
 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever):
 Argument leaf type 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever
 does not satisfy: reside outside of package 'org.apache.flink..' or annotated 
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.PulsarSink.createCommitter(): Returned 
leaf type org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable 
does not satisfy: reside outside of package 'org.apache.flink..' or annotated 
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.PulsarSink.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext):
 Returned leaf type 
org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable does not 
satisfy: reside outside of package 'org.apache.flink..' or annotated with 
@Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.PulsarSink.getCommittableSerializer(): 
Returned leaf type 
org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable does not 
satisfy: reside outside of package 'org.apache.flink..' or annotated with 
@Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.writer.message.MessageKeyHash.getDescription():
 Returned leaf type org.apache.flink.configuration.description.InlineElement 
does not satisfy: reside outside of package 'org.apache.flink..' or annotated 
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.getDescription():
 Returned leaf type org.apache.flink.configuration.description.InlineElement 
does not satisfy: reside outside of package 'org.apache.flink..' or annotated 
with @Public or annotated with @PublicEvolving or annotated with @Deprecated

Review comment:
       I do not think we should add new violations. 
   
   i.e. For the sink-related methods, you can `@Internal` annotation to the 
creation of the writer and committer.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * An unmodifiable {@link Configuration} for Pulsar. We provide extra method 
for building the
+ * different Pulsar client instance.
+ */
+public abstract class PulsarConfiguration extends UnmodifiableConfiguration {

Review comment:
       I like the idea of providing a dedicated connector configuration. It 
definitely makes the DataStream to Table transition easier. 

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+
+import org.apache.pulsar.client.api.Schema;
+
+/** Wrap the Pulsar's Schema into PulsarSerializationSchema. */
+@Internal
+public class PulsarSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
+    private static final long serialVersionUID = -2567052498398184194L;
+
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    private final PulsarSchema<IN> pulsarSchema;
+
+    public PulsarSchemaWrapper(PulsarSchema<IN> pulsarSchema) {
+        this.pulsarSchema = pulsarSchema;
+    }
+
+    @Override
+    public RawMessage<byte[]> serialize(IN element, PulsarSinkContext 
sinkContext) {
+        RawMessage<byte[]> message;
+
+        if (sinkContext.isEnableSchemaEvolution()) {
+            // We don't need to serialize incoming records in schema evolution.
+            message = new RawMessage<>(EMPTY_BYTES);

Review comment:
       Can you explain this?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/MessageKeyHash.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.message;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import org.apache.pulsar.client.impl.Hash;
+import org.apache.pulsar.client.impl.JavaStringHash;
+import org.apache.pulsar.client.impl.Murmur3_32Hash;
+
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Predefined the available hash function for routing the message. */
+@PublicEvolving

Review comment:
       Should users interact with this class?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
+import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * All the Pulsar Producer share the same Client, but self hold the queue for 
specified topic. So we
+ * have to create different instance for different topic.
+ */
+@Internal
+public class TopicProducerRegister implements Closeable {
+
+    private final PulsarClient pulsarClient;
+    private final SinkConfiguration sinkConfiguration;
+    private final Schema<?> schema;
+    private final ConcurrentHashMap<String, Producer<?>> producerRegister;
+    private final ConcurrentHashMap<String, Transaction> transactionRegister;

Review comment:
       Do you need the synchronization here?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import 
org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.function.SerializableFunction;
+
+/**
+ * The Sink implementation of Pulsar. Please use a {@link PulsarSinkBuilder} 
to construct a {@link
+ * PulsarSink}. The following example shows how to create a PulsarSink 
receiving records of {@code
+ * String} type.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ *      .setServiceUrl(operator().serviceUrl())
+ *      .setAdminUrl(operator().adminUrl())
+ *      .setTopic(topic)
+ *      
.setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ *      .build();
+ * }</pre>
+ *
+ * <p>The sink supports all delivery guarantees described by {@link 
DeliveryGuarantee}.
+ *
+ * <ul>
+ *   <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: 
messages may be lost in
+ *       case of issues on the Pulsar broker and messages may be duplicated in 
case of a Flink
+ *       failure.
+ *   <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all 
outstanding records in
+ *       the Pulsar buffers to be acknowledged by the Pulsar producer on a 
checkpoint. No messages
+ *       will be lost in case of any issue with the Pulsar brokers but 
messages may be duplicated
+ *       when Flink restarts.
+ *   <li>{@link DeliveryGuarantee#EXACTLY_ONCE}: In this mode the PulsarSink 
will write all messages
+ *       in a Pulsar transaction that will be committed to Pulsar on a 
checkpoint. Thus, no
+ *       duplicates will be seen in case of a Flink restart. However, this 
delays record writing
+ *       effectively until a checkpoint is written, so adjust the checkpoint 
duration accordingly.
+ *       Additionally, it is highly recommended to tweak Pulsar transaction 
timeout (link) >>
+ *       maximum checkpoint duration + maximum restart duration or data loss 
may happen when Pulsar
+ *       expires an uncommitted transaction.
+ * </ul>
+ *
+ * <p>See {@link PulsarSinkBuilder} for more details.
+ *
+ * @param <IN> The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, 
PulsarCommittable> {
+    private static final long serialVersionUID = 4416714587951282119L;
+
+    private final SinkConfiguration sinkConfiguration;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicMetadataListener metadataListener;
+    private final SerializableFunction<SinkConfiguration, TopicRouter<IN>> 
topicRouterProvider;
+
+    PulsarSink(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            TopicRoutingMode topicRoutingMode,
+            TopicRouter<IN> topicRouter) {
+        this.sinkConfiguration = sinkConfiguration;
+        this.serializationSchema = serializationSchema;
+        this.metadataListener = metadataListener;

Review comment:
       Nit: please use `checkNotNull` for ctor inputs.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
##########
@@ -18,101 +18,55 @@
 
 package org.apache.flink.connector.pulsar.common.utils;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkRuntimeException;
 
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 
-/**
- * Transaction was introduced into pulsar since 2.7.0, but the interface 
{@link Transaction} didn't
- * provide a id method until 2.8.1. We have to add this util for acquiring the 
{@link TxnID} for
- * compatible consideration.
- *
- * <p>TODO Remove this hack after pulsar 2.8.1 release.
- */
-@Internal
-@SuppressWarnings("java:S3011")
+/** A suit of workarounds for the Pulsar Transaction. */
 public final class PulsarTransactionUtils {
 
-    private static volatile Field mostBitsField;
-    private static volatile Field leastBitsField;
-
     private PulsarTransactionUtils() {
         // No public constructor
     }
 
-    public static TxnID getId(Transaction transaction) {
-        // 2.8.1 and after.
-        try {
-            Method getId = Transaction.class.getDeclaredMethod("getTxnID");
-            return (TxnID) getId.invoke(transaction);
-        } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
-            // 2.8.0 and before.
-            TransactionImpl impl = (TransactionImpl) transaction;
-            Long txnIdMostBits = getTxnIdMostBits(impl);
-            Long txnIdLeastBits = getTxnIdLeastBits(impl);
-
-            checkNotNull(txnIdMostBits, "Failed to get txnIdMostBits");
-            checkNotNull(txnIdLeastBits, "Failed to get txnIdLeastBits");
-
-            return new TxnID(txnIdMostBits, txnIdLeastBits);
-        }
-    }
-
-    private static Long getTxnIdMostBits(TransactionImpl transaction) {
-        if (mostBitsField == null) {
-            synchronized (PulsarTransactionUtils.class) {
-                if (mostBitsField == null) {
-                    try {
-                        mostBitsField = 
TransactionImpl.class.getDeclaredField("txnIdMostBits");
-                        mostBitsField.setAccessible(true);
-                    } catch (NoSuchFieldException e) {
-                        // Nothing to do for this exception.
-                    }
-                }
-            }
-        }
+    public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs) {
 
-        if (mostBitsField != null) {
-            try {
-                return (Long) mostBitsField.get(transaction);
-            } catch (IllegalAccessException e) {
-                // Nothing to do for this exception.
-            }
+        try {
+            CompletableFuture<Transaction> future =
+                    sneakyClient(pulsarClient::newTransaction)
+                            .withTransactionTimeout(timeoutMs, 
TimeUnit.MILLISECONDS)
+                            .build();
+
+            return future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(e);
+        } catch (ExecutionException e) {
+            throw new FlinkRuntimeException(e);
         }
-
-        return null;
     }
 
-    private static Long getTxnIdLeastBits(TransactionImpl transaction) {
-        if (leastBitsField == null) {
-            synchronized (PulsarTransactionUtils.class) {
-                if (leastBitsField == null) {
-                    try {
-                        leastBitsField = 
TransactionImpl.class.getDeclaredField("txnIdLeastBits");
-                        leastBitsField.setAccessible(true);
-                    } catch (NoSuchFieldException e) {
-                        // Nothing to do for this exception.
-                    }
-                }
+    /**
+     * This is a bug in original {@link 
TransactionCoordinatorClientException#unwrap(Throwable)}
+     * method. Pulsar wraps the {@link ExecutionException} which hides the 
read execution exception.
+     */
+    public static TransactionCoordinatorClientException unwrap(
+            TransactionCoordinatorClientException e) {

Review comment:
       I think you can use `ExceptionUtils#findThrowable`

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, 
PulsarCommittable> {

Review comment:
       Is this only public for the docstring in the `SinkConfiguration? If it 
stays public it needs an annotation, probably `@Internal`

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
+import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * All the Pulsar Producer share the same Client, but self hold the queue for 
specified topic. So we
+ * have to create different instance for different topic.
+ */
+@Internal
+public class TopicProducerRegister implements Closeable {
+
+    private final PulsarClient pulsarClient;
+    private final SinkConfiguration sinkConfiguration;
+    private final Schema<?> schema;
+    private final ConcurrentHashMap<String, Producer<?>> producerRegister;
+    private final ConcurrentHashMap<String, Transaction> transactionRegister;
+
+    public TopicProducerRegister(
+            SinkConfiguration sinkConfiguration, PulsarSerializationSchema<?> 
serializationSchema) {
+        this.pulsarClient = createClient(sinkConfiguration);
+        this.sinkConfiguration = sinkConfiguration;
+        if (sinkConfiguration.isEnableSchemaEvolution()) {
+            // Use this schema would send it to Pulsar and perform validation.
+            this.schema = serializationSchema.schema();
+        } else {
+            // We would serialize message by flink's policy and send byte 
array to Pulsar.
+            this.schema = Schema.BYTES;
+        }
+        this.producerRegister = new ConcurrentHashMap<>();
+        this.transactionRegister = new ConcurrentHashMap<>();
+    }
+
+    /** Create or return the cached topic related producer. */
+    public Producer<?> getOrCreateProducer(String topic) {
+        return producerRegister.computeIfAbsent(
+                topic,
+                t -> {
+                    ProducerBuilder<?> builder =
+                            createProducerBuilder(pulsarClient, schema, 
sinkConfiguration);
+                    // Set the required topic name.
+                    builder.topic(t);
+                    return sneakyClient(builder::create);
+                });
+    }
+
+    /**
+     * Get the cached topic related transaction. Or create a new transaction 
after checkpointing.
+     */
+    public Transaction getOrCreateTransaction(String topic) {
+        return transactionRegister.computeIfAbsent(
+                topic,
+                t -> {
+                    long timeoutMillis = 
sinkConfiguration.getTransactionTimeoutMillis();
+                    return createTransaction(pulsarClient, timeoutMillis);
+                });
+    }
+
+    /** Abort the existed transactions. This method would be used when close 
PulsarWriter. */
+    private void abortTransactions() {
+        if (transactionRegister.isEmpty()) {
+            return;
+        }
+
+        TransactionCoordinatorClient coordinatorClient =
+                ((PulsarClientImpl) pulsarClient).getTcClient();
+        // This null check is used for making sure transaction is enabled in 
client.
+        checkNotNull(coordinatorClient);
+
+        try (Closer closer = Closer.create()) {
+            for (Transaction transaction : transactionRegister.values()) {
+                TxnID txnID = transaction.getTxnID();
+                closer.register(() -> coordinatorClient.abort(txnID));
+            }
+
+            clearTransactions();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    /** Clean these transactions. All transactions should be passed to Pulsar 
committer. */
+    private void clearTransactions() {
+        // Clear the transactions, we would create new transaction when new 
message comes.
+        transactionRegister.clear();
+    }
+
+    /**
+     * Convert the transactions into committable list for Pulsar Committer. 
The transactions would
+     * be removed until flink triggering a checkpoint.
+     */
+    public List<PulsarCommittable> prepareCommit() {
+        List<PulsarCommittable> committables = new 
ArrayList<>(transactionRegister.size());
+        transactionRegister.forEach(
+                (topic, transaction) -> {
+                    TxnID txnID = transaction.getTxnID();
+                    PulsarCommittable committable = new 
PulsarCommittable(txnID, topic);
+                    committables.add(committable);
+                });
+
+        clearTransactions();
+        return committables;
+    }
+
+    /**
+     * Flush all the messages buffered in the client and wait until all 
messages have been
+     * successfully persisted.
+     */
+    public void flush() throws IOException {
+        for (Producer<?> producer : producerRegister.values()) {
+            producer.flush();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try (Closer closer = Closer.create()) {
+            // Flush all the pending messages to Pulsar. This wouldn't cause 
exception.
+            closer.register(this::flush);
+
+            // Abort all the existed transactions.
+            closer.register(this::abortTransactions);

Review comment:
       What happens with transactions that the Committer is still trying to 
commit?

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, 
PulsarCommittable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicRouter<IN> topicRouter;
+    private final PulsarSinkContextAdapter sinkContextAdapter;
+    private final TopicMetadataListener metadataListener;
+    private final MailboxExecutor mailboxExecutor;
+    private final TopicProducerRegister producerRegister;
+    private final Semaphore pendingMessages;
+
+    /**
+     * Constructor creating a Pulsar writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+     * fails.
+     *
+     * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+     * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+     * @param metadataListener the listener for querying topic metadata.
+     * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+     * @param initContext context to provide information about the runtime 
environment.
+     */
+    public PulsarWriter(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            SerializableFunction<SinkConfiguration, TopicRouter<IN>> 
topicRouterProvider,
+            InitContext initContext) {
+        this.sinkConfiguration = sinkConfiguration;
+        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+        this.serializationSchema = serializationSchema;
+        this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+        this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, 
sinkConfiguration);
+        this.metadataListener = metadataListener;
+        this.mailboxExecutor = initContext.getMailboxExecutor();
+
+        // Initialize topic metadata listener.
+        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+        ProcessingTimeService timeService = 
initContext.getProcessingTimeService();
+        metadataListener.open(sinkConfiguration, timeService);
+
+        // Initialize topic router.
+        topicRouter.open(sinkConfiguration);
+
+        // Initialize the serialization schema.
+        try {
+            InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, 
sinkContextAdapter, sinkConfiguration);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        // Create this producer register after opening serialization schema!
+        this.producerRegister = new TopicProducerRegister(sinkConfiguration, 
serializationSchema);
+        this.pendingMessages = new 
Semaphore(sinkConfiguration.getMaxPendingMessages());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // Serialize the incoming element.
+        sinkContextAdapter.updateTimestamp(context);
+        RawMessage<byte[]> message = serializationSchema.serialize(element, 
sinkContextAdapter);
+
+        // Choose the right topic to send.
+        List<String> availableTopics = metadataListener.availableTopics();
+        String topic = topicRouter.route(element, message, availableTopics, 
sinkContextAdapter);
+
+        // Create message builder for sending message.
+        TypedMessageBuilder<?> builder = createMessageBuilder(topic, 
deliveryGuarantee);
+        if (sinkConfiguration.isEnableSchemaEvolution()) {
+            ((TypedMessageBuilder<IN>) builder).value(element);
+        } else {
+            ((TypedMessageBuilder<byte[]>) builder).value(message.getValue());
+        }
+        message.supplement(builder);
+
+        // Perform message sending.
+        if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+            // We would just ignore the sending exception. This may cause data 
loss.
+            builder.sendAsync();
+        } else {
+            // Waiting for permits to write message.
+            pendingMessages.acquire();
+            CompletableFuture<MessageId> sender = builder.sendAsync();
+            sender.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.release();

Review comment:
       Do we really need the semaphore here? I can imagine you can simply count 
the number of requests and only update the counter in the mailbox.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, 
PulsarCommittable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicRouter<IN> topicRouter;
+    private final PulsarSinkContextAdapter sinkContextAdapter;
+    private final TopicMetadataListener metadataListener;
+    private final MailboxExecutor mailboxExecutor;
+    private final TopicProducerRegister producerRegister;
+    private final Semaphore pendingMessages;
+
+    /**
+     * Constructor creating a Pulsar writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+     * fails.
+     *
+     * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+     * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+     * @param metadataListener the listener for querying topic metadata.
+     * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+     * @param initContext context to provide information about the runtime 
environment.
+     */
+    public PulsarWriter(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            SerializableFunction<SinkConfiguration, TopicRouter<IN>> 
topicRouterProvider,
+            InitContext initContext) {
+        this.sinkConfiguration = sinkConfiguration;
+        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+        this.serializationSchema = serializationSchema;
+        this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+        this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, 
sinkConfiguration);
+        this.metadataListener = metadataListener;
+        this.mailboxExecutor = initContext.getMailboxExecutor();
+
+        // Initialize topic metadata listener.
+        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+        ProcessingTimeService timeService = 
initContext.getProcessingTimeService();
+        metadataListener.open(sinkConfiguration, timeService);
+
+        // Initialize topic router.
+        topicRouter.open(sinkConfiguration);
+
+        // Initialize the serialization schema.
+        try {
+            InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, 
sinkContextAdapter, sinkConfiguration);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        // Create this producer register after opening serialization schema!
+        this.producerRegister = new TopicProducerRegister(sinkConfiguration, 
serializationSchema);
+        this.pendingMessages = new 
Semaphore(sinkConfiguration.getMaxPendingMessages());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // Serialize the incoming element.
+        sinkContextAdapter.updateTimestamp(context);
+        RawMessage<byte[]> message = serializationSchema.serialize(element, 
sinkContextAdapter);
+
+        // Choose the right topic to send.
+        List<String> availableTopics = metadataListener.availableTopics();
+        String topic = topicRouter.route(element, message, availableTopics, 
sinkContextAdapter);
+
+        // Create message builder for sending message.
+        TypedMessageBuilder<?> builder = createMessageBuilder(topic, 
deliveryGuarantee);

Review comment:
       I am not familiar enough with Pulsar but happens with pending 
transactions in case the job has failed before the committables in the 
committer are snapshotted.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, 
PulsarCommittable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicRouter<IN> topicRouter;
+    private final PulsarSinkContextAdapter sinkContextAdapter;
+    private final TopicMetadataListener metadataListener;
+    private final MailboxExecutor mailboxExecutor;
+    private final TopicProducerRegister producerRegister;
+    private final Semaphore pendingMessages;
+
+    /**
+     * Constructor creating a Pulsar writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+     * fails.
+     *
+     * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+     * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+     * @param metadataListener the listener for querying topic metadata.
+     * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+     * @param initContext context to provide information about the runtime 
environment.
+     */
+    public PulsarWriter(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            SerializableFunction<SinkConfiguration, TopicRouter<IN>> 
topicRouterProvider,
+            InitContext initContext) {
+        this.sinkConfiguration = sinkConfiguration;
+        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+        this.serializationSchema = serializationSchema;
+        this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+        this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, 
sinkConfiguration);
+        this.metadataListener = metadataListener;
+        this.mailboxExecutor = initContext.getMailboxExecutor();

Review comment:
       Nit: please `checkNotNull` for the ctor arguments

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import 
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import 
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import 
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle 
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, 
PulsarCommittable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarWriter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final PulsarSerializationSchema<IN> serializationSchema;
+    private final TopicRouter<IN> topicRouter;
+    private final PulsarSinkContextAdapter sinkContextAdapter;
+    private final TopicMetadataListener metadataListener;
+    private final MailboxExecutor mailboxExecutor;
+    private final TopicProducerRegister producerRegister;
+    private final Semaphore pendingMessages;
+
+    /**
+     * Constructor creating a Pulsar writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * PulsarSerializationSchema#open(InitializationContext, 
PulsarSinkContext, SinkConfiguration)}
+     * fails.
+     *
+     * @param sinkConfiguration the configuration to configure the Pulsar 
producer.
+     * @param serializationSchema serialize to transform the incoming records 
to {@link RawMessage}.
+     * @param metadataListener the listener for querying topic metadata.
+     * @param topicRouterProvider create related topic router to choose topic 
by incoming records.
+     * @param initContext context to provide information about the runtime 
environment.
+     */
+    public PulsarWriter(
+            SinkConfiguration sinkConfiguration,
+            PulsarSerializationSchema<IN> serializationSchema,
+            TopicMetadataListener metadataListener,
+            SerializableFunction<SinkConfiguration, TopicRouter<IN>> 
topicRouterProvider,
+            InitContext initContext) {
+        this.sinkConfiguration = sinkConfiguration;
+        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+        this.serializationSchema = serializationSchema;
+        this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+        this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext, 
sinkConfiguration);
+        this.metadataListener = metadataListener;
+        this.mailboxExecutor = initContext.getMailboxExecutor();
+
+        // Initialize topic metadata listener.
+        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+        ProcessingTimeService timeService = 
initContext.getProcessingTimeService();
+        metadataListener.open(sinkConfiguration, timeService);
+
+        // Initialize topic router.
+        topicRouter.open(sinkConfiguration);
+
+        // Initialize the serialization schema.
+        try {
+            InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, 
sinkContextAdapter, sinkConfiguration);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        // Create this producer register after opening serialization schema!
+        this.producerRegister = new TopicProducerRegister(sinkConfiguration, 
serializationSchema);
+        this.pendingMessages = new 
Semaphore(sinkConfiguration.getMaxPendingMessages());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // Serialize the incoming element.
+        sinkContextAdapter.updateTimestamp(context);
+        RawMessage<byte[]> message = serializationSchema.serialize(element, 
sinkContextAdapter);
+
+        // Choose the right topic to send.
+        List<String> availableTopics = metadataListener.availableTopics();
+        String topic = topicRouter.route(element, message, availableTopics, 
sinkContextAdapter);
+
+        // Create message builder for sending message.
+        TypedMessageBuilder<?> builder = createMessageBuilder(topic, 
deliveryGuarantee);
+        if (sinkConfiguration.isEnableSchemaEvolution()) {
+            ((TypedMessageBuilder<IN>) builder).value(element);
+        } else {
+            ((TypedMessageBuilder<byte[]>) builder).value(message.getValue());
+        }
+        message.supplement(builder);
+
+        // Perform message sending.
+        if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+            // We would just ignore the sending exception. This may cause data 
loss.
+            builder.sendAsync();
+        } else {
+            // Waiting for permits to write message.
+            pendingMessages.acquire();
+            CompletableFuture<MessageId> sender = builder.sendAsync();
+            sender.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.release();
+                        if (ex != null) {
+                            mailboxExecutor.execute(
+                                    () -> {
+                                        throw new FlinkRuntimeException(
+                                                "Failed to send data to Pulsar 
" + topic, ex);
+                                    },
+                                    "Failed to send message to Pulsar");
+                        } else {
+                            LOG.debug("Sent message to Pulsar {} with message 
id {}", topic, id);
+                        }
+                    });
+        }
+    }
+
+    private TypedMessageBuilder<?> createMessageBuilder(
+            String topic, DeliveryGuarantee deliveryGuarantee) {
+        Producer<?> producer = producerRegister.getOrCreateProducer(topic);
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            Transaction transaction = 
producerRegister.getOrCreateTransaction(topic);
+            return producer.newMessage(transaction);
+        } else {
+            return producer.newMessage();
+        }
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        while (pendingMessages.availablePermits() < 
sinkConfiguration.getMaxPendingMessages()) {

Review comment:
       I think you need to flush here if `endOfInput = true` otherwise you 
might lose records when the job finishes.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/**
+ * Committer implementation for {@link PulsarSink}.
+ *
+ * <p>The committer is responsible to finalize the Pulsar transactions by 
committing them.
+ */
+@Internal
+public class PulsarCommitter implements Committer<PulsarCommittable>, 
Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarCommitter.class);
+
+    private final SinkConfiguration sinkConfiguration;
+
+    private PulsarClient pulsarClient;
+    private TransactionCoordinatorClient coordinatorClient;
+
+    public PulsarCommitter(SinkConfiguration sinkConfiguration) {
+        this.sinkConfiguration = sinkConfiguration;
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<PulsarCommittable>> requests)
+            throws IOException, InterruptedException {
+        TransactionCoordinatorClient client = transactionCoordinatorClient();
+
+        for (CommitRequest<PulsarCommittable> request : requests) {
+            PulsarCommittable committable = request.getCommittable();
+            TxnID txnID = committable.getTxnID();
+            String topic = committable.getTopic();
+
+            LOG.debug("Start committing the Pulsar transaction {} for topic 
{}", txnID, topic);
+            try {
+                client.commit(txnID);
+            } catch (TransactionCoordinatorClientException e) {
+                // This is a known bug for Pulsar Transaction. We have to use 
instance of.
+                TransactionCoordinatorClientException ex = 
PulsarTransactionUtils.unwrap(e);
+                if (ex instanceof CoordinatorNotFoundException) {
+                    LOG.error(
+                            "We couldn't find the Transaction Coordinator from 
Pulsar broker {}. "
+                                    + "Check your broker configuration.",
+                            committable,
+                            ex);
+                    request.signalFailedWithKnownReason(ex);
+                } else if (ex instanceof InvalidTxnStatusException) {
+                    LOG.error(
+                            "Unable to commit transaction ({}) because it's in 
an invalid state. "
+                                    + "Most likely the transaction has been 
aborted for some reason. "
+                                    + "Please check the Pulsar broker logs for 
more details.",
+                            committable,
+                            ex);
+                    request.signalAlreadyCommitted();
+                } else if (ex instanceof TransactionNotFoundException) {
+                    if (request.getNumberOfRetries() == 0) {
+                        LOG.error(
+                                "Unable to commit transaction ({}) because 
it's not found on Pulsar broker. "
+                                        + "Most likely the checkpoint interval 
exceed the transaction timeout.",
+                                committable,
+                                ex);
+                        request.signalFailedWithKnownReason(ex);
+                    } else {
+                        request.signalAlreadyCommitted();

Review comment:
       This case feels strange. Is it really possible that the commit succeeded 
although the commit failed earlier and was retried.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to