fapaul commented on a change in pull request #17452:
URL: https://github.com/apache/flink/pull/17452#discussion_r798579396
##########
File path:
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
##########
@@ -113,6 +113,11 @@
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(org.apache.flink.ap
org.apache.flink.connector.kafka.sink.KafkaSink.getCommittableSerializer():
Returned leaf type org.apache.flink.connector.kafka.sink.KafkaCommittable does
not satisfy: reside outside of package 'org.apache.flink..' or annotated with
@Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.kafka.sink.KafkaSink.getWriterStateSerializer():
Returned leaf type org.apache.flink.connector.kafka.sink.KafkaWriterState does
not satisfy: reside outside of package 'org.apache.flink..' or annotated with
@Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.getPartitionOffsets(java.util.Collection,
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever):
Argument leaf type
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever
does not satisfy: reside outside of package 'org.apache.flink..' or annotated
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.PulsarSink.createCommitter(): Returned
leaf type org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable
does not satisfy: reside outside of package 'org.apache.flink..' or annotated
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.PulsarSink.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext):
Returned leaf type
org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable does not
satisfy: reside outside of package 'org.apache.flink..' or annotated with
@Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.PulsarSink.getCommittableSerializer():
Returned leaf type
org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable does not
satisfy: reside outside of package 'org.apache.flink..' or annotated with
@Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.writer.message.MessageKeyHash.getDescription():
Returned leaf type org.apache.flink.configuration.description.InlineElement
does not satisfy: reside outside of package 'org.apache.flink..' or annotated
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
+org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode.getDescription():
Returned leaf type org.apache.flink.configuration.description.InlineElement
does not satisfy: reside outside of package 'org.apache.flink..' or annotated
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Review comment:
I do not think we should add new violations.
i.e. For the sink-related methods, you can `@Internal` annotation to the
creation of the writer and committer.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * An unmodifiable {@link Configuration} for Pulsar. We provide extra method
for building the
+ * different Pulsar client instance.
+ */
+public abstract class PulsarConfiguration extends UnmodifiableConfiguration {
Review comment:
I like the idea of providing a dedicated connector configuration. It
definitely makes the DataStream to Table transition easier.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+
+import org.apache.pulsar.client.api.Schema;
+
+/** Wrap the Pulsar's Schema into PulsarSerializationSchema. */
+@Internal
+public class PulsarSchemaWrapper<IN> implements PulsarSerializationSchema<IN> {
+ private static final long serialVersionUID = -2567052498398184194L;
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+ private final PulsarSchema<IN> pulsarSchema;
+
+ public PulsarSchemaWrapper(PulsarSchema<IN> pulsarSchema) {
+ this.pulsarSchema = pulsarSchema;
+ }
+
+ @Override
+ public RawMessage<byte[]> serialize(IN element, PulsarSinkContext
sinkContext) {
+ RawMessage<byte[]> message;
+
+ if (sinkContext.isEnableSchemaEvolution()) {
+ // We don't need to serialize incoming records in schema evolution.
+ message = new RawMessage<>(EMPTY_BYTES);
Review comment:
Can you explain this?
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/MessageKeyHash.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.message;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import org.apache.pulsar.client.impl.Hash;
+import org.apache.pulsar.client.impl.JavaStringHash;
+import org.apache.pulsar.client.impl.Murmur3_32Hash;
+
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Predefined the available hash function for routing the message. */
+@PublicEvolving
Review comment:
Should users interact with this class?
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
+import static
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * All the Pulsar Producer share the same Client, but self hold the queue for
specified topic. So we
+ * have to create different instance for different topic.
+ */
+@Internal
+public class TopicProducerRegister implements Closeable {
+
+ private final PulsarClient pulsarClient;
+ private final SinkConfiguration sinkConfiguration;
+ private final Schema<?> schema;
+ private final ConcurrentHashMap<String, Producer<?>> producerRegister;
+ private final ConcurrentHashMap<String, Transaction> transactionRegister;
Review comment:
Do you need the synchronization here?
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import
org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter;
+import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter;
+import
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.function.SerializableFunction;
+
+/**
+ * The Sink implementation of Pulsar. Please use a {@link PulsarSinkBuilder}
to construct a {@link
+ * PulsarSink}. The following example shows how to create a PulsarSink
receiving records of {@code
+ * String} type.
+ *
+ * <pre>{@code
+ * PulsarSink<String> sink = PulsarSink.builder()
+ * .setServiceUrl(operator().serviceUrl())
+ * .setAdminUrl(operator().adminUrl())
+ * .setTopic(topic)
+ *
.setSerializationSchema(PulsarSerializationSchema.pulsarSchema(Schema.STRING))
+ * .build();
+ * }</pre>
+ *
+ * <p>The sink supports all delivery guarantees described by {@link
DeliveryGuarantee}.
+ *
+ * <ul>
+ * <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees:
messages may be lost in
+ * case of issues on the Pulsar broker and messages may be duplicated in
case of a Flink
+ * failure.
+ * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} the sink will wait for all
outstanding records in
+ * the Pulsar buffers to be acknowledged by the Pulsar producer on a
checkpoint. No messages
+ * will be lost in case of any issue with the Pulsar brokers but
messages may be duplicated
+ * when Flink restarts.
+ * <li>{@link DeliveryGuarantee#EXACTLY_ONCE}: In this mode the PulsarSink
will write all messages
+ * in a Pulsar transaction that will be committed to Pulsar on a
checkpoint. Thus, no
+ * duplicates will be seen in case of a Flink restart. However, this
delays record writing
+ * effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly.
+ * Additionally, it is highly recommended to tweak Pulsar transaction
timeout (link) >>
+ * maximum checkpoint duration + maximum restart duration or data loss
may happen when Pulsar
+ * expires an uncommitted transaction.
+ * </ul>
+ *
+ * <p>See {@link PulsarSinkBuilder} for more details.
+ *
+ * @param <IN> The input type of the sink.
+ */
+@PublicEvolving
+public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN,
PulsarCommittable> {
+ private static final long serialVersionUID = 4416714587951282119L;
+
+ private final SinkConfiguration sinkConfiguration;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicMetadataListener metadataListener;
+ private final SerializableFunction<SinkConfiguration, TopicRouter<IN>>
topicRouterProvider;
+
+ PulsarSink(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ TopicRoutingMode topicRoutingMode,
+ TopicRouter<IN> topicRouter) {
+ this.sinkConfiguration = sinkConfiguration;
+ this.serializationSchema = serializationSchema;
+ this.metadataListener = metadataListener;
Review comment:
Nit: please use `checkNotNull` for ctor inputs.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
##########
@@ -18,101 +18,55 @@
package org.apache.flink.connector.pulsar.common.utils;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-/**
- * Transaction was introduced into pulsar since 2.7.0, but the interface
{@link Transaction} didn't
- * provide a id method until 2.8.1. We have to add this util for acquiring the
{@link TxnID} for
- * compatible consideration.
- *
- * <p>TODO Remove this hack after pulsar 2.8.1 release.
- */
-@Internal
-@SuppressWarnings("java:S3011")
+/** A suit of workarounds for the Pulsar Transaction. */
public final class PulsarTransactionUtils {
- private static volatile Field mostBitsField;
- private static volatile Field leastBitsField;
-
private PulsarTransactionUtils() {
// No public constructor
}
- public static TxnID getId(Transaction transaction) {
- // 2.8.1 and after.
- try {
- Method getId = Transaction.class.getDeclaredMethod("getTxnID");
- return (TxnID) getId.invoke(transaction);
- } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
- // 2.8.0 and before.
- TransactionImpl impl = (TransactionImpl) transaction;
- Long txnIdMostBits = getTxnIdMostBits(impl);
- Long txnIdLeastBits = getTxnIdLeastBits(impl);
-
- checkNotNull(txnIdMostBits, "Failed to get txnIdMostBits");
- checkNotNull(txnIdLeastBits, "Failed to get txnIdLeastBits");
-
- return new TxnID(txnIdMostBits, txnIdLeastBits);
- }
- }
-
- private static Long getTxnIdMostBits(TransactionImpl transaction) {
- if (mostBitsField == null) {
- synchronized (PulsarTransactionUtils.class) {
- if (mostBitsField == null) {
- try {
- mostBitsField =
TransactionImpl.class.getDeclaredField("txnIdMostBits");
- mostBitsField.setAccessible(true);
- } catch (NoSuchFieldException e) {
- // Nothing to do for this exception.
- }
- }
- }
- }
+ public static Transaction createTransaction(PulsarClient pulsarClient,
long timeoutMs) {
- if (mostBitsField != null) {
- try {
- return (Long) mostBitsField.get(transaction);
- } catch (IllegalAccessException e) {
- // Nothing to do for this exception.
- }
+ try {
+ CompletableFuture<Transaction> future =
+ sneakyClient(pulsarClient::newTransaction)
+ .withTransactionTimeout(timeoutMs,
TimeUnit.MILLISECONDS)
+ .build();
+
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ throw new FlinkRuntimeException(e);
}
-
- return null;
}
- private static Long getTxnIdLeastBits(TransactionImpl transaction) {
- if (leastBitsField == null) {
- synchronized (PulsarTransactionUtils.class) {
- if (leastBitsField == null) {
- try {
- leastBitsField =
TransactionImpl.class.getDeclaredField("txnIdLeastBits");
- leastBitsField.setAccessible(true);
- } catch (NoSuchFieldException e) {
- // Nothing to do for this exception.
- }
- }
+ /**
+ * This is a bug in original {@link
TransactionCoordinatorClientException#unwrap(Throwable)}
+ * method. Pulsar wraps the {@link ExecutionException} which hides the
read execution exception.
+ */
+ public static TransactionCoordinatorClientException unwrap(
+ TransactionCoordinatorClientException e) {
Review comment:
I think you can use `ExceptionUtils#findThrowable`
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN,
PulsarCommittable> {
Review comment:
Is this only public for the docstring in the `SinkConfiguration? If it
stays public it needs an annotation, probably `@Internal`
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
+import static
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * All the Pulsar Producer share the same Client, but self hold the queue for
specified topic. So we
+ * have to create different instance for different topic.
+ */
+@Internal
+public class TopicProducerRegister implements Closeable {
+
+ private final PulsarClient pulsarClient;
+ private final SinkConfiguration sinkConfiguration;
+ private final Schema<?> schema;
+ private final ConcurrentHashMap<String, Producer<?>> producerRegister;
+ private final ConcurrentHashMap<String, Transaction> transactionRegister;
+
+ public TopicProducerRegister(
+ SinkConfiguration sinkConfiguration, PulsarSerializationSchema<?>
serializationSchema) {
+ this.pulsarClient = createClient(sinkConfiguration);
+ this.sinkConfiguration = sinkConfiguration;
+ if (sinkConfiguration.isEnableSchemaEvolution()) {
+ // Use this schema would send it to Pulsar and perform validation.
+ this.schema = serializationSchema.schema();
+ } else {
+ // We would serialize message by flink's policy and send byte
array to Pulsar.
+ this.schema = Schema.BYTES;
+ }
+ this.producerRegister = new ConcurrentHashMap<>();
+ this.transactionRegister = new ConcurrentHashMap<>();
+ }
+
+ /** Create or return the cached topic related producer. */
+ public Producer<?> getOrCreateProducer(String topic) {
+ return producerRegister.computeIfAbsent(
+ topic,
+ t -> {
+ ProducerBuilder<?> builder =
+ createProducerBuilder(pulsarClient, schema,
sinkConfiguration);
+ // Set the required topic name.
+ builder.topic(t);
+ return sneakyClient(builder::create);
+ });
+ }
+
+ /**
+ * Get the cached topic related transaction. Or create a new transaction
after checkpointing.
+ */
+ public Transaction getOrCreateTransaction(String topic) {
+ return transactionRegister.computeIfAbsent(
+ topic,
+ t -> {
+ long timeoutMillis =
sinkConfiguration.getTransactionTimeoutMillis();
+ return createTransaction(pulsarClient, timeoutMillis);
+ });
+ }
+
+ /** Abort the existed transactions. This method would be used when close
PulsarWriter. */
+ private void abortTransactions() {
+ if (transactionRegister.isEmpty()) {
+ return;
+ }
+
+ TransactionCoordinatorClient coordinatorClient =
+ ((PulsarClientImpl) pulsarClient).getTcClient();
+ // This null check is used for making sure transaction is enabled in
client.
+ checkNotNull(coordinatorClient);
+
+ try (Closer closer = Closer.create()) {
+ for (Transaction transaction : transactionRegister.values()) {
+ TxnID txnID = transaction.getTxnID();
+ closer.register(() -> coordinatorClient.abort(txnID));
+ }
+
+ clearTransactions();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ /** Clean these transactions. All transactions should be passed to Pulsar
committer. */
+ private void clearTransactions() {
+ // Clear the transactions, we would create new transaction when new
message comes.
+ transactionRegister.clear();
+ }
+
+ /**
+ * Convert the transactions into committable list for Pulsar Committer.
The transactions would
+ * be removed until flink triggering a checkpoint.
+ */
+ public List<PulsarCommittable> prepareCommit() {
+ List<PulsarCommittable> committables = new
ArrayList<>(transactionRegister.size());
+ transactionRegister.forEach(
+ (topic, transaction) -> {
+ TxnID txnID = transaction.getTxnID();
+ PulsarCommittable committable = new
PulsarCommittable(txnID, topic);
+ committables.add(committable);
+ });
+
+ clearTransactions();
+ return committables;
+ }
+
+ /**
+ * Flush all the messages buffered in the client and wait until all
messages have been
+ * successfully persisted.
+ */
+ public void flush() throws IOException {
+ for (Producer<?> producer : producerRegister.values()) {
+ producer.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (Closer closer = Closer.create()) {
+ // Flush all the pending messages to Pulsar. This wouldn't cause
exception.
+ closer.register(this::flush);
+
+ // Abort all the existed transactions.
+ closer.register(this::abortTransactions);
Review comment:
What happens with transactions that the Committer is still trying to
commit?
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN,
PulsarCommittable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarWriter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+ private final DeliveryGuarantee deliveryGuarantee;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicRouter<IN> topicRouter;
+ private final PulsarSinkContextAdapter sinkContextAdapter;
+ private final TopicMetadataListener metadataListener;
+ private final MailboxExecutor mailboxExecutor;
+ private final TopicProducerRegister producerRegister;
+ private final Semaphore pendingMessages;
+
+ /**
+ * Constructor creating a Pulsar writer.
+ *
+ * <p>It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext,
PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar
producer.
+ * @param serializationSchema serialize to transform the incoming records
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic
by incoming records.
+ * @param initContext context to provide information about the runtime
environment.
+ */
+ public PulsarWriter(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ SerializableFunction<SinkConfiguration, TopicRouter<IN>>
topicRouterProvider,
+ InitContext initContext) {
+ this.sinkConfiguration = sinkConfiguration;
+ this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+ this.serializationSchema = serializationSchema;
+ this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+ this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext,
sinkConfiguration);
+ this.metadataListener = metadataListener;
+ this.mailboxExecutor = initContext.getMailboxExecutor();
+
+ // Initialize topic metadata listener.
+ LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+ ProcessingTimeService timeService =
initContext.getProcessingTimeService();
+ metadataListener.open(sinkConfiguration, timeService);
+
+ // Initialize topic router.
+ topicRouter.open(sinkConfiguration);
+
+ // Initialize the serialization schema.
+ try {
+ InitializationContext initializationContext =
+ initContext.asSerializationSchemaInitializationContext();
+ serializationSchema.open(initializationContext,
sinkContextAdapter, sinkConfiguration);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Cannot initialize schema.", e);
+ }
+
+ // Create this producer register after opening serialization schema!
+ this.producerRegister = new TopicProducerRegister(sinkConfiguration,
serializationSchema);
+ this.pendingMessages = new
Semaphore(sinkConfiguration.getMaxPendingMessages());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void write(IN element, Context context) throws IOException,
InterruptedException {
+ // Serialize the incoming element.
+ sinkContextAdapter.updateTimestamp(context);
+ RawMessage<byte[]> message = serializationSchema.serialize(element,
sinkContextAdapter);
+
+ // Choose the right topic to send.
+ List<String> availableTopics = metadataListener.availableTopics();
+ String topic = topicRouter.route(element, message, availableTopics,
sinkContextAdapter);
+
+ // Create message builder for sending message.
+ TypedMessageBuilder<?> builder = createMessageBuilder(topic,
deliveryGuarantee);
+ if (sinkConfiguration.isEnableSchemaEvolution()) {
+ ((TypedMessageBuilder<IN>) builder).value(element);
+ } else {
+ ((TypedMessageBuilder<byte[]>) builder).value(message.getValue());
+ }
+ message.supplement(builder);
+
+ // Perform message sending.
+ if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+ // We would just ignore the sending exception. This may cause data
loss.
+ builder.sendAsync();
+ } else {
+ // Waiting for permits to write message.
+ pendingMessages.acquire();
+ CompletableFuture<MessageId> sender = builder.sendAsync();
+ sender.whenComplete(
+ (id, ex) -> {
+ pendingMessages.release();
Review comment:
Do we really need the semaphore here? I can imagine you can simply count
the number of requests and only update the counter in the mailbox.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN,
PulsarCommittable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarWriter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+ private final DeliveryGuarantee deliveryGuarantee;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicRouter<IN> topicRouter;
+ private final PulsarSinkContextAdapter sinkContextAdapter;
+ private final TopicMetadataListener metadataListener;
+ private final MailboxExecutor mailboxExecutor;
+ private final TopicProducerRegister producerRegister;
+ private final Semaphore pendingMessages;
+
+ /**
+ * Constructor creating a Pulsar writer.
+ *
+ * <p>It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext,
PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar
producer.
+ * @param serializationSchema serialize to transform the incoming records
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic
by incoming records.
+ * @param initContext context to provide information about the runtime
environment.
+ */
+ public PulsarWriter(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ SerializableFunction<SinkConfiguration, TopicRouter<IN>>
topicRouterProvider,
+ InitContext initContext) {
+ this.sinkConfiguration = sinkConfiguration;
+ this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+ this.serializationSchema = serializationSchema;
+ this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+ this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext,
sinkConfiguration);
+ this.metadataListener = metadataListener;
+ this.mailboxExecutor = initContext.getMailboxExecutor();
+
+ // Initialize topic metadata listener.
+ LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+ ProcessingTimeService timeService =
initContext.getProcessingTimeService();
+ metadataListener.open(sinkConfiguration, timeService);
+
+ // Initialize topic router.
+ topicRouter.open(sinkConfiguration);
+
+ // Initialize the serialization schema.
+ try {
+ InitializationContext initializationContext =
+ initContext.asSerializationSchemaInitializationContext();
+ serializationSchema.open(initializationContext,
sinkContextAdapter, sinkConfiguration);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Cannot initialize schema.", e);
+ }
+
+ // Create this producer register after opening serialization schema!
+ this.producerRegister = new TopicProducerRegister(sinkConfiguration,
serializationSchema);
+ this.pendingMessages = new
Semaphore(sinkConfiguration.getMaxPendingMessages());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void write(IN element, Context context) throws IOException,
InterruptedException {
+ // Serialize the incoming element.
+ sinkContextAdapter.updateTimestamp(context);
+ RawMessage<byte[]> message = serializationSchema.serialize(element,
sinkContextAdapter);
+
+ // Choose the right topic to send.
+ List<String> availableTopics = metadataListener.availableTopics();
+ String topic = topicRouter.route(element, message, availableTopics,
sinkContextAdapter);
+
+ // Create message builder for sending message.
+ TypedMessageBuilder<?> builder = createMessageBuilder(topic,
deliveryGuarantee);
Review comment:
I am not familiar enough with Pulsar but happens with pending
transactions in case the job has failed before the committables in the
committer are snapshotted.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN,
PulsarCommittable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarWriter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+ private final DeliveryGuarantee deliveryGuarantee;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicRouter<IN> topicRouter;
+ private final PulsarSinkContextAdapter sinkContextAdapter;
+ private final TopicMetadataListener metadataListener;
+ private final MailboxExecutor mailboxExecutor;
+ private final TopicProducerRegister producerRegister;
+ private final Semaphore pendingMessages;
+
+ /**
+ * Constructor creating a Pulsar writer.
+ *
+ * <p>It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext,
PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar
producer.
+ * @param serializationSchema serialize to transform the incoming records
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic
by incoming records.
+ * @param initContext context to provide information about the runtime
environment.
+ */
+ public PulsarWriter(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ SerializableFunction<SinkConfiguration, TopicRouter<IN>>
topicRouterProvider,
+ InitContext initContext) {
+ this.sinkConfiguration = sinkConfiguration;
+ this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+ this.serializationSchema = serializationSchema;
+ this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+ this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext,
sinkConfiguration);
+ this.metadataListener = metadataListener;
+ this.mailboxExecutor = initContext.getMailboxExecutor();
Review comment:
Nit: please `checkNotNull` for the ctor arguments
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import
org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
+import
org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextAdapter;
+import org.apache.flink.connector.pulsar.sink.writer.message.RawMessage;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
+import
org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.IOUtils.closeAll;
+
+/**
+ * This class is responsible to write records in a Pulsar topic and to handle
the different delivery
+ * {@link DeliveryGuarantee}s.
+ *
+ * @param <IN> The type of the input elements.
+ */
+public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN,
PulsarCommittable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarWriter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+ private final DeliveryGuarantee deliveryGuarantee;
+ private final PulsarSerializationSchema<IN> serializationSchema;
+ private final TopicRouter<IN> topicRouter;
+ private final PulsarSinkContextAdapter sinkContextAdapter;
+ private final TopicMetadataListener metadataListener;
+ private final MailboxExecutor mailboxExecutor;
+ private final TopicProducerRegister producerRegister;
+ private final Semaphore pendingMessages;
+
+ /**
+ * Constructor creating a Pulsar writer.
+ *
+ * <p>It will throw a {@link RuntimeException} if {@link
+ * PulsarSerializationSchema#open(InitializationContext,
PulsarSinkContext, SinkConfiguration)}
+ * fails.
+ *
+ * @param sinkConfiguration the configuration to configure the Pulsar
producer.
+ * @param serializationSchema serialize to transform the incoming records
to {@link RawMessage}.
+ * @param metadataListener the listener for querying topic metadata.
+ * @param topicRouterProvider create related topic router to choose topic
by incoming records.
+ * @param initContext context to provide information about the runtime
environment.
+ */
+ public PulsarWriter(
+ SinkConfiguration sinkConfiguration,
+ PulsarSerializationSchema<IN> serializationSchema,
+ TopicMetadataListener metadataListener,
+ SerializableFunction<SinkConfiguration, TopicRouter<IN>>
topicRouterProvider,
+ InitContext initContext) {
+ this.sinkConfiguration = sinkConfiguration;
+ this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
+ this.serializationSchema = serializationSchema;
+ this.topicRouter = topicRouterProvider.apply(sinkConfiguration);
+ this.sinkContextAdapter = new PulsarSinkContextAdapter(initContext,
sinkConfiguration);
+ this.metadataListener = metadataListener;
+ this.mailboxExecutor = initContext.getMailboxExecutor();
+
+ // Initialize topic metadata listener.
+ LOG.debug("Initialize topic metadata after creating Pulsar writer.");
+ ProcessingTimeService timeService =
initContext.getProcessingTimeService();
+ metadataListener.open(sinkConfiguration, timeService);
+
+ // Initialize topic router.
+ topicRouter.open(sinkConfiguration);
+
+ // Initialize the serialization schema.
+ try {
+ InitializationContext initializationContext =
+ initContext.asSerializationSchemaInitializationContext();
+ serializationSchema.open(initializationContext,
sinkContextAdapter, sinkConfiguration);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Cannot initialize schema.", e);
+ }
+
+ // Create this producer register after opening serialization schema!
+ this.producerRegister = new TopicProducerRegister(sinkConfiguration,
serializationSchema);
+ this.pendingMessages = new
Semaphore(sinkConfiguration.getMaxPendingMessages());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void write(IN element, Context context) throws IOException,
InterruptedException {
+ // Serialize the incoming element.
+ sinkContextAdapter.updateTimestamp(context);
+ RawMessage<byte[]> message = serializationSchema.serialize(element,
sinkContextAdapter);
+
+ // Choose the right topic to send.
+ List<String> availableTopics = metadataListener.availableTopics();
+ String topic = topicRouter.route(element, message, availableTopics,
sinkContextAdapter);
+
+ // Create message builder for sending message.
+ TypedMessageBuilder<?> builder = createMessageBuilder(topic,
deliveryGuarantee);
+ if (sinkConfiguration.isEnableSchemaEvolution()) {
+ ((TypedMessageBuilder<IN>) builder).value(element);
+ } else {
+ ((TypedMessageBuilder<byte[]>) builder).value(message.getValue());
+ }
+ message.supplement(builder);
+
+ // Perform message sending.
+ if (deliveryGuarantee == DeliveryGuarantee.NONE) {
+ // We would just ignore the sending exception. This may cause data
loss.
+ builder.sendAsync();
+ } else {
+ // Waiting for permits to write message.
+ pendingMessages.acquire();
+ CompletableFuture<MessageId> sender = builder.sendAsync();
+ sender.whenComplete(
+ (id, ex) -> {
+ pendingMessages.release();
+ if (ex != null) {
+ mailboxExecutor.execute(
+ () -> {
+ throw new FlinkRuntimeException(
+ "Failed to send data to Pulsar
" + topic, ex);
+ },
+ "Failed to send message to Pulsar");
+ } else {
+ LOG.debug("Sent message to Pulsar {} with message
id {}", topic, id);
+ }
+ });
+ }
+ }
+
+ private TypedMessageBuilder<?> createMessageBuilder(
+ String topic, DeliveryGuarantee deliveryGuarantee) {
+ Producer<?> producer = producerRegister.getOrCreateProducer(topic);
+ if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+ Transaction transaction =
producerRegister.getOrCreateTransaction(topic);
+ return producer.newMessage(transaction);
+ } else {
+ return producer.newMessage();
+ }
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException {
+ while (pendingMessages.availablePermits() <
sinkConfiguration.getMaxPendingMessages()) {
Review comment:
I think you need to flush here if `endOfInput = true` otherwise you
might lose records when the job finishes.
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorNotFoundException;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+
+/**
+ * Committer implementation for {@link PulsarSink}.
+ *
+ * <p>The committer is responsible to finalize the Pulsar transactions by
committing them.
+ */
+@Internal
+public class PulsarCommitter implements Committer<PulsarCommittable>,
Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarCommitter.class);
+
+ private final SinkConfiguration sinkConfiguration;
+
+ private PulsarClient pulsarClient;
+ private TransactionCoordinatorClient coordinatorClient;
+
+ public PulsarCommitter(SinkConfiguration sinkConfiguration) {
+ this.sinkConfiguration = sinkConfiguration;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<PulsarCommittable>> requests)
+ throws IOException, InterruptedException {
+ TransactionCoordinatorClient client = transactionCoordinatorClient();
+
+ for (CommitRequest<PulsarCommittable> request : requests) {
+ PulsarCommittable committable = request.getCommittable();
+ TxnID txnID = committable.getTxnID();
+ String topic = committable.getTopic();
+
+ LOG.debug("Start committing the Pulsar transaction {} for topic
{}", txnID, topic);
+ try {
+ client.commit(txnID);
+ } catch (TransactionCoordinatorClientException e) {
+ // This is a known bug for Pulsar Transaction. We have to use
instance of.
+ TransactionCoordinatorClientException ex =
PulsarTransactionUtils.unwrap(e);
+ if (ex instanceof CoordinatorNotFoundException) {
+ LOG.error(
+ "We couldn't find the Transaction Coordinator from
Pulsar broker {}. "
+ + "Check your broker configuration.",
+ committable,
+ ex);
+ request.signalFailedWithKnownReason(ex);
+ } else if (ex instanceof InvalidTxnStatusException) {
+ LOG.error(
+ "Unable to commit transaction ({}) because it's in
an invalid state. "
+ + "Most likely the transaction has been
aborted for some reason. "
+ + "Please check the Pulsar broker logs for
more details.",
+ committable,
+ ex);
+ request.signalAlreadyCommitted();
+ } else if (ex instanceof TransactionNotFoundException) {
+ if (request.getNumberOfRetries() == 0) {
+ LOG.error(
+ "Unable to commit transaction ({}) because
it's not found on Pulsar broker. "
+ + "Most likely the checkpoint interval
exceed the transaction timeout.",
+ committable,
+ ex);
+ request.signalFailedWithKnownReason(ex);
+ } else {
+ request.signalAlreadyCommitted();
Review comment:
This case feels strange. Is it really possible that the commit succeeded
although the commit failed earlier and was retried.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]