This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e72bfede70a00146f466b3e7491fc0f83eb6c41 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Tue Feb 15 22:21:50 2022 +0800 [FLINK-26024][connector/pulsar] Create a PulsarSerializationSchema for better records serialization. --- .../pulsar/sink/writer/message/PulsarMessage.java | 111 ++++++++++++++++++ .../sink/writer/message/PulsarMessageBuilder.java | 127 ++++++++++++++++++++ .../writer/serializer/PulsarSchemaWrapper.java | 59 ++++++++++ .../serializer/PulsarSerializationSchema.java | 129 +++++++++++++++++++++ .../PulsarSerializationSchemaWrapper.java | 59 ++++++++++ 5 files changed, 485 insertions(+) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java new file mode 100644 index 0000000..0c45763 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessage.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.message; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** + * The message instance would be used for {@link TypedMessageBuilder}. We create this class because + * the Pulsar lacks such kind of POJO class. + */ +@PublicEvolving +public class PulsarMessage<T> { + + @Nullable private final byte[] orderingKey; + @Nullable private final String key; + private final long eventTime; + private final Schema<T> schema; + @Nullable private final T value; + @Nullable private final Map<String, String> properties; + @Nullable private final Long sequenceId; + @Nullable private final List<String> replicationClusters; + private final boolean disableReplication; + + /** Package private for building this class only in {@link PulsarMessageBuilder}. */ + PulsarMessage( + @Nullable byte[] orderingKey, + @Nullable String key, + long eventTime, + Schema<T> schema, + @Nullable T value, + @Nullable Map<String, String> properties, + @Nullable Long sequenceId, + @Nullable List<String> replicationClusters, + boolean disableReplication) { + this.orderingKey = orderingKey; + this.key = key; + this.eventTime = eventTime; + this.schema = schema; + this.value = value; + this.properties = properties; + this.sequenceId = sequenceId; + this.replicationClusters = replicationClusters; + this.disableReplication = disableReplication; + } + + @Nullable + public byte[] getOrderingKey() { + return orderingKey; + } + + @Nullable + public String getKey() { + return key; + } + + public long getEventTime() { + return eventTime; + } + + public Schema<T> getSchema() { + return schema; + } + + @Nullable + public T getValue() { + return value; + } + + @Nullable + public Map<String, String> getProperties() { + return properties; + } + + @Nullable + public Long getSequenceId() { + return sequenceId; + } + + @Nullable + public List<String> getReplicationClusters() { + return replicationClusters; + } + + public boolean isDisableReplication() { + return disableReplication; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java new file mode 100644 index 0000000..9330d09 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.message; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.pulsar.sink.writer.router.KeyHashTopicRouter; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** {@link TypedMessageBuilder} wrapper for providing the required method for end-users. */ +@PublicEvolving +public class PulsarMessageBuilder<T> { + + private byte[] orderingKey; + private String key; + private long eventTime; + Schema<T> schema; + private T value; + private Map<String, String> properties = new HashMap<>(); + private Long sequenceId; + private List<String> replicationClusters; + private boolean disableReplication = false; + + /** Method wrapper of {@link TypedMessageBuilder#orderingKey(byte[])}. */ + public PulsarMessageBuilder<T> orderingKey(byte[] orderingKey) { + this.orderingKey = checkNotNull(orderingKey); + return this; + } + + /** + * Property {@link TypedMessageBuilder#key(String)}. This property would also be used in {@link + * KeyHashTopicRouter}. + */ + public PulsarMessageBuilder<T> key(String key) { + this.key = checkNotNull(key); + return null; + } + + /** Method wrapper of {@link TypedMessageBuilder#eventTime(long)}. */ + public PulsarMessageBuilder<T> eventTime(long eventTime) { + this.eventTime = eventTime; + return this; + } + + /** + * Method wrapper of {@link TypedMessageBuilder#value(Object)}. You can pass any schema for + * validating it on Pulsar. This is called schema evolution. But the topic on Pulsar should bind + * to a fixed {@link Schema}. You can't have multiple schemas on the same topic unless it's + * compatible with each other. + * + * @param value The value could be null, which is called tombstones message in Pulsar. (It will + * be skipped and considered deleted.) + */ + public PulsarMessageBuilder<T> value(Schema<T> schema, T value) { + this.schema = checkNotNull(schema); + this.value = value; + return this; + } + + /** Method wrapper of {@link TypedMessageBuilder#property(String, String)}. */ + public PulsarMessageBuilder<T> property(String key, String value) { + this.properties.put(checkNotNull(key), checkNotNull(value)); + return this; + } + + /** Method wrapper of {@link TypedMessageBuilder#properties(Map)}. */ + public PulsarMessageBuilder<T> properties(Map<String, String> properties) { + this.properties.putAll(checkNotNull(properties)); + return this; + } + + /** Method wrapper of {@link TypedMessageBuilder#sequenceId(long)}. */ + public PulsarMessageBuilder<T> sequenceId(long sequenceId) { + this.sequenceId = sequenceId; + return this; + } + + /** Method wrapper of {@link TypedMessageBuilder#replicationClusters(List)}. */ + public PulsarMessageBuilder<T> replicationClusters(List<String> replicationClusters) { + this.replicationClusters = checkNotNull(replicationClusters); + return this; + } + + /** Method wrapper of {@link TypedMessageBuilder#disableReplication()}. */ + public PulsarMessageBuilder<T> disableReplication() { + this.disableReplication = true; + return this; + } + + public PulsarMessage<T> build() { + checkNotNull(schema, "Schema should be provided."); + + return new PulsarMessage<>( + orderingKey, + key, + eventTime, + schema, + value, + properties, + sequenceId, + replicationClusters, + disableReplication); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java new file mode 100644 index 0000000..0d5aaf0 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSchemaWrapper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.serializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder; + +import org.apache.pulsar.client.api.Schema; + +/** + * Wrap the Pulsar's Schema into PulsarSerializationSchema. We support schema evolution out of box + * by this implementation. + */ +@Internal +public class PulsarSchemaWrapper<IN> implements PulsarSerializationSchema<IN> { + private static final long serialVersionUID = -2567052498398184194L; + + private final PulsarSchema<IN> pulsarSchema; + + public PulsarSchemaWrapper(PulsarSchema<IN> pulsarSchema) { + this.pulsarSchema = pulsarSchema; + } + + @Override + public PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext) { + Schema<IN> schema = this.pulsarSchema.getPulsarSchema(); + if (sinkContext.isEnableSchemaEvolution()) { + PulsarMessageBuilder<IN> builder = new PulsarMessageBuilder<>(); + builder.value(schema, element); + + return builder.build(); + } else { + PulsarMessageBuilder<byte[]> builder = new PulsarMessageBuilder<>(); + byte[] bytes = schema.encode(element); + builder.value(Schema.BYTES, bytes); + + return builder.build(); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java new file mode 100644 index 0000000..da7f706 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchema.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.serializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.schema.KeyValue; + +import java.io.Serializable; + +/** + * The serialization schema for how to serialize records into Pulsar. + * + * @param <IN> The message type send to Pulsar. + */ +@PublicEvolving +public interface PulsarSerializationSchema<IN> extends Serializable { + + /** + * Initialization method for the schema. It is called before the actual working methods {@link + * #serialize(Object, PulsarSinkContext)} and thus suitable for one-time setup work. + * + * <p>The provided {@link InitializationContext} can be used to access additional features such + * as registering user metrics. + * + * @param initializationContext Contextual information that can be used during initialization. + * @param sinkContext Runtime information i.e. partitions, subtaskId. + * @param sinkConfiguration All the configure options for the Pulsar sink. You can add custom + * options. + */ + default void open( + InitializationContext initializationContext, + PulsarSinkContext sinkContext, + SinkConfiguration sinkConfiguration) + throws Exception { + // Nothing to do by default. + } + + /** + * Serializes the given element into bytes and {@link Schema#BYTES}. Or you can convert it to a + * new type of instance with a {@link Schema}. The return value {@link PulsarMessage} can be + * built by {@link PulsarMessageBuilder}. All the methods provided in the {@link + * PulsarMessageBuilder} is just equals to the {@link TypedMessageBuilder}. + * + * @param element Element to be serialized. + * @param sinkContext Context to provide extra information. + */ + PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext); + + /** + * Create a PulsarSerializationSchema by using the flink's {@link SerializationSchema}. It would + * serialize the message into byte array and send it to Pulsar with {@link Schema#BYTES}. + */ + static <T> PulsarSerializationSchema<T> flinkSchema( + SerializationSchema<T> serializationSchema) { + return new PulsarSerializationSchemaWrapper<>(serializationSchema); + } + + /** + * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send + * message with the given schema to Pulsar, this would be enabled by {@link + * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and + * send it as {@link Schema#BYTES} by default. + * + * <p>We only support <a + * href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive + * types</a> here. + */ + static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema) { + PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema); + return new PulsarSchemaWrapper<>(pulsarSchema); + } + + /** + * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send + * message with the given schema to Pulsar, this would be enabled by {@link + * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and + * send it as {@link Schema#BYTES} by default. + * + * <p>We only support <a + * href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct types</a> here. + */ + static <T> PulsarSerializationSchema<T> pulsarSchema(Schema<T> schema, Class<T> typeClass) { + PulsarSchema<T> pulsarSchema = new PulsarSchema<>(schema, typeClass); + return new PulsarSchemaWrapper<>(pulsarSchema); + } + + /** + * Create a PulsarSerializationSchema by using the Pulsar {@link Schema} instance. We can send + * message with the given schema to Pulsar, this would be enabled by {@link + * PulsarSinkBuilder#enableSchemaEvolution()}. We would serialize the message into bytes and + * send it as {@link Schema#BYTES} by default. + * + * <p>We only support <a + * href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue types</a> here. + */ + static <K, V> PulsarSerializationSchema<KeyValue<K, V>> pulsarSchema( + Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V> valueClass) { + PulsarSchema<KeyValue<K, V>> pulsarSchema = + new PulsarSchema<>(schema, keyClass, valueClass); + return new PulsarSchemaWrapper<>(pulsarSchema); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java new file mode 100644 index 0000000..716d2db --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/serializer/PulsarSerializationSchemaWrapper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.writer.serializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder; + +import org.apache.pulsar.client.api.Schema; + +/** Wrap the Flink's SerializationSchema into PulsarSerializationSchema. */ +@Internal +public class PulsarSerializationSchemaWrapper<IN> implements PulsarSerializationSchema<IN> { + private static final long serialVersionUID = 4948155843623161119L; + + private final SerializationSchema<IN> serializationSchema; + + public PulsarSerializationSchemaWrapper(SerializationSchema<IN> serializationSchema) { + this.serializationSchema = serializationSchema; + } + + @Override + public void open( + InitializationContext initializationContext, + PulsarSinkContext sinkContext, + SinkConfiguration sinkConfiguration) + throws Exception { + serializationSchema.open(initializationContext); + } + + @Override + public PulsarMessage<?> serialize(IN element, PulsarSinkContext sinkContext) { + PulsarMessageBuilder<byte[]> builder = new PulsarMessageBuilder<>(); + byte[] value = serializationSchema.serialize(element); + builder.value(Schema.BYTES, value); + + return builder.build(); + } +}