[
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504826#comment-16504826
]
ASF GitHub Bot commented on FLINK-9168:
---------------------------------------
Github user surryr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5845#discussion_r193793061
--- Diff:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.util.SerializableObject;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to produce data into a Pulsar topic.
+ */
+public class FlinkPulsarProducer<IN>
+ extends RichSinkFunction<IN>
+ implements CheckpointedFunction {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkPulsarProducer.class);
+
+ /**
+ * The pulsar service url.
+ */
+ protected final String serviceUrl;
+
+ /**
+ * User defined configuration for the producer.
+ */
+ protected final ProducerConfiguration producerConfig;
+
+ /**
+ * The name of the default topic this producer is writing data to.
+ */
+ protected final String defaultTopicName;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with
Flink into.
+ * byte[] for Pulsar.
+ */
+ protected final SerializationSchema<IN> schema;
+
+ /**
+ * User-provided key extractor for assigning a key to a pulsar message.
+ */
+ protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
+
+ /**
+ * Produce Mode.
+ */
+ protected PulsarProduceMode produceMode =
PulsarProduceMode.AT_LEAST_ONE;
+
+ /**
+ * If true, the producer will wait until all outstanding records have
been send to the broker.
+ */
+ protected boolean flushOnCheckpoint;
+
+ // -------------------------------- Runtime fields
------------------------------------------
+
+ /** Pulsar Producer instance. */
+ protected transient Producer producer;
+
+ /** The callback than handles error propagation or logging callbacks. */
+ protected transient Function<MessageId, MessageId> successCallback =
msgId -> {
+ acknowledgeMessage();
+ return msgId;
+ };
+
+ protected transient Function<Throwable, MessageId> failureCallback;
+
+ /** Errors encountered in the async producer are stored here. */
+ protected transient volatile Exception asyncException;
+
+ /** Lock for accessing the pending records. */
+ protected final SerializableObject pendingRecordsLock = new
SerializableObject();
+
+ /** Number of unacknowledged records. */
+ protected long pendingRecords;
+
+ public FlinkPulsarProducer(String serviceUrl,
+ String defaultTopicName,
+ SerializationSchema<IN>
serializationSchema,
+ ProducerConfiguration
producerConfig,
+ PulsarKeyExtractor<IN>
keyExtractor) {
+ this.serviceUrl = checkNotNull(serviceUrl, "Service url not
set");
+ this.defaultTopicName = checkNotNull(defaultTopicName,
"TopicName not set");
+ this.schema = checkNotNull(serializationSchema, "Serialization
Schema not set");
+ this.producerConfig = checkNotNull(producerConfig, "Producer
Config is not set");
+ this.flinkPulsarKeyExtractor =
getOrNullKeyExtractor(keyExtractor);
+ ClosureCleaner.ensureSerializable(serializationSchema);
+ }
+
+ // ---------------------------------- Properties
--------------------------
+
+
+ /**
+ * @return pulsar key extractor.
+ */
+ public PulsarKeyExtractor<IN> getKeyExtractor() {
+ return flinkPulsarKeyExtractor;
+ }
+
+ /**
+ * Gets this producer's operating mode.
+ */
+ public PulsarProduceMode getProduceMode() {
+ return this.produceMode;
+ }
+
+ /**
+ * Sets this producer's operating mode.
+ *
+ * @param produceMode The mode of operation.
+ */
+ public void setProduceMode(PulsarProduceMode produceMode) {
+ this.produceMode = checkNotNull(produceMode);
+ }
+
+ /**
+ * If set to true, the Flink producer will wait for all outstanding
messages in the Pulsar buffers
+ * to be acknowledged by the Pulsar producer on a checkpoint.
+ * This way, the producer can guarantee that messages in the Pulsar
buffers are part of the checkpoint.
+ *
+ * @param flush Flag indicating the flushing mode (true = flush on
checkpoint)
+ */
+ public void setFlushOnCheckpoint(boolean flush) {
+ this.flushOnCheckpoint = flush;
+ }
+
+ // ----------------------------------- Sink Methods
--------------------------
+
+ @SuppressWarnings("unchecked")
+ private static final <T> PulsarKeyExtractor<T>
getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
+ if (null == extractor) {
+ return PulsarKeyExtractor.NULL;
+ } else {
+ return extractor;
+ }
+ }
+
+ private Producer createProducer(ProducerConfiguration configuration)
throws Exception {
+ PulsarClient client = PulsarClient.create(serviceUrl);
+ return client.createProducer(defaultTopicName, configuration);
+ }
+
+ /**
+ * Initializes the connection to pulsar.
+ *
+ * @param parameters configuration used for initialization
+ * @throws Exception
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.producer = createProducer(producerConfig);
+
+ RuntimeContext ctx = getRuntimeContext();
+
+ LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into
pulsar topic {}",
+ ctx.getIndexOfThisSubtask() + 1,
ctx.getNumberOfParallelSubtasks(), defaultTopicName);
+
+ if (flushOnCheckpoint && !((StreamingRuntimeContext)
this.getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.warn("Flushing on checkpoint is enabled, but
checkpointing is not enabled. Disabling flushing.");
+ flushOnCheckpoint = false;
+ }
+
+ if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
+ this.failureCallback = cause -> {
+ LOG.error("Error while sending record to Pulsar
: " + cause.getMessage(), cause);
+ return null;
+ };
+ } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode){
+ this.failureCallback = cause -> {
+ if (null == asyncException) {
+ if (cause instanceof Exception) {
+ asyncException = (Exception)
cause;
+ } else {
+ asyncException = new
Exception(cause);
+ }
+ }
+ return null;
+ };
+ } else {
+ throw new UnsupportedOperationException("Unsupported
produce mode " + produceMode);
+ }
+ }
+
+ @Override
+ public void invoke(IN value, Context context) throws Exception {
+ checkErroneous();
+
+ byte[] serializedValue = schema.serialize(value);
+
+ MessageBuilder msgBuilder = MessageBuilder.create();
+ if (null != context.timestamp()) {
+ msgBuilder =
msgBuilder.setEventTime(context.timestamp());
+ }
+ String msgKey = flinkPulsarKeyExtractor.getKey(value);
+ if (null != msgKey) {
+ msgBuilder = msgBuilder.setKey(msgKey);
+ }
+ Message message = msgBuilder
+ .setContent(serializedValue)
+ .build();
+
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ pendingRecords++;
+ }
+ }
+ producer.sendAsync(message)
+ .thenApply(successCallback)
+ .exceptionally(failureCallback);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (producer != null) {
+ producer.close();
+ }
+
+ // make sure we propagate pending errors
+ checkErroneous();
+ }
+
+ // ------------------- Logic for handling checkpoint flushing
-------------------------- //
+
+ private void acknowledgeMessage() {
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ pendingRecords--;
+ if (pendingRecords == 0) {
+ pendingRecordsLock.notifyAll();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ // check for asynchronous errors and fail the checkpoint if
necessary
+ checkErroneous();
+
+ if (flushOnCheckpoint) {
+ // wait until all the messages are acknowledged
+ synchronized (pendingRecordsLock) {
+ while (pendingRecords > 0) {
+ pendingRecordsLock.wait(100);
+ }
+ }
+
+ // if the flushed requests has errors, we should
propagate it also and fail the checkpoint
+ checkErroneous();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ // nothing to do
+ }
+
+ // ----------------------------------- Utilities
--------------------------
+
+ protected void checkErroneous() throws Exception {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new Exception("Failed to send data to Kafka: " +
e.getMessage(), e);
--- End diff --
Seems like typo is here.
> Pulsar Sink Connector
> ---------------------
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Affects Versions: 1.6.0
> Reporter: Zongyang Xiao
> Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)