This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit cc508f5c77516e3c3420475ee6c451602d9fe052 Author: thexia <[email protected]> AuthorDate: Wed May 18 10:38:34 2022 +0800 [INLONG-4141][Sort] Sort lightwieght support load data from Pulsar (#4231) --- .../org/apache/inlong/sort/protocol/node/Node.java | 2 + .../protocol/node/extract/PulsarExtractNode.java | 123 +++++++ .../node/extract/PulsarExtractNodeTest.java | 55 +++ inlong-sort/sort-connectors/pom.xml | 12 + .../table/DynamicPulsarSerializationSchema.java | 253 ++++++++++++++ .../pulsar/table/PulsarDynamicTableFactory.java | 370 ++++++++++++++++++++ .../flink/pulsar/table/PulsarDynamicTableSink.java | 382 +++++++++++++++++++++ .../table/UpsertPulsarDynamicTableFactory.java | 364 ++++++++++++++++++++ .../org.apache.flink.table.factories.Factory | 3 +- .../flink/parser/PulsarSqlParserTest.java | 108 ++++++ .../org.apache.flink.table.factories.Factory | 4 +- licenses/inlong-sort/LICENSE | 7 + pom.xml | 7 + 13 files changed, 1688 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java index 56f53bce6..7e3eafb2e 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java @@ -26,6 +26,7 @@ import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode; +import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode; import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode; import org.apache.inlong.sort.protocol.node.load.HiveLoadNode; @@ -50,6 +51,7 @@ import java.util.TreeMap; @JsonSubTypes.Type(value = KafkaExtractNode.class, name = "kafkaExtract"), @JsonSubTypes.Type(value = PostgresExtractNode.class, name = "postgresExtract"), @JsonSubTypes.Type(value = FileSystemExtractNode.class, name = "fileSystemExtract"), + @JsonSubTypes.Type(value = PulsarExtractNode.class, name = "pulsarExtract"), @JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"), @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"), @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"), diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java new file mode 100644 index 000000000..270cd0f66 --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.extract; + +import com.google.common.base.Preconditions; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.node.ExtractNode; +import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; + +@EqualsAndHashCode(callSuper = true) +@JsonTypeName("pulsarExtract") +@Data +public class PulsarExtractNode extends ExtractNode { + private static final long serialVersionUID = 1L; + + @Nonnull + @JsonProperty("topic") + private String topic; + @Nonnull + @JsonProperty("adminUrl") + private String adminUrl; + @Nonnull + @JsonProperty("serviceUrl") + private String serviceUrl; + @Nonnull + @JsonProperty("format") + private Format format; + + @JsonProperty("scanStartupMode") + private String scanStartupMode; + + @JsonProperty("primaryKey") + private String primaryKey; + + @JsonCreator + public PulsarExtractNode(@JsonProperty("id") String id, + @JsonProperty("name") String name, + @JsonProperty("fields") List<FieldInfo> fields, + @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField, + @JsonProperty("properties") Map<String, String> properties, + @Nonnull @JsonProperty("topic") String topic, + @Nonnull @JsonProperty("adminUrl") String adminUrl, + @Nonnull @JsonProperty("serviceUrl") String serviceUrl, + @Nonnull @JsonProperty("format") Format format, + @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode, + @JsonProperty("primaryKey") String primaryKey) { + super(id, name, fields, watermarkField, properties); + this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null."); + this.adminUrl = Preconditions.checkNotNull(adminUrl, "pulsar adminUrl is null."); + this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null."); + this.format = Preconditions.checkNotNull(format, "pulsar format is null."); + this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode, + "pulsar scanStartupMode is null."); + this.primaryKey = primaryKey; + } + + /** + * generate table options + * + * @return options + */ + @Override + public Map<String, String> tableOptions() { + Map<String, String> options = super.tableOptions(); + if (StringUtils.isEmpty(this.primaryKey)) { + options.put("connector", "pulsar-inlong"); + options.putAll(format.generateOptions(false)); + } else { + options.put("connector", "upsert-pulsar-inlong"); + options.putAll(format.generateOptions(true)); + } + options.put("generic", "true"); + options.put("service-url", serviceUrl); + options.put("admin-url", adminUrl); + options.put("topic", topic); + options.put("scan.startup.mode", scanStartupMode); + + return options; + } + + @Override + public String genTableName() { + return String.format("table_%s", super.getId()); + } + + @Override + public String getPrimaryKey() { + return primaryKey; + } + + @Override + public List<FieldInfo> getPartitionFields() { + return super.getPartitionFields(); + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java new file mode 100644 index 000000000..0ab2fc81b --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.extract; + +import org.apache.inlong.sort.SerializeBaseTest; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.format.CsvFormat; +import org.apache.inlong.sort.protocol.node.format.Format; + +import java.util.Arrays; +import java.util.List; + +/** + * Test for {@link PulsarExtractNode} + */ +public class PulsarExtractNodeTest extends SerializeBaseTest<Node> { + + @Override + public Node getTestObject() { + List<FieldInfo> fields = Arrays.asList( + new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("age", new IntFormatInfo())); + Format format = new CsvFormat(); + return new PulsarExtractNode("2", + "pulsar_input", + fields, + null, + null, + "persistent://public/default/test_stream", + "http://localhost:8080", + "pulsar://localhost:6650", + format, + "earliest", + null); + } +} diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml index eeb4a41d3..a94e0e310 100644 --- a/inlong-sort/sort-connectors/pom.xml +++ b/inlong-sort/sort-connectors/pom.xml @@ -69,6 +69,11 @@ <artifactId>flink-table-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_${flink.scala.binary.version}</artifactId> @@ -139,6 +144,12 @@ <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.streamnative.connectors</groupId> + <artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId> @@ -156,6 +167,7 @@ </exclusions> </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java new file mode 100644 index 000000000..4c2037701 --- /dev/null +++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.flink.pulsar.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.protobuf.PbFormatOptions; +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils; +import org.apache.flink.streaming.util.serialization.FlinkSchema; +import org.apache.flink.streaming.util.serialization.PulsarContextAware; +import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkArgument; + +/** + * A specific Serializer for {@link PulsarDynamicTableSink}. + */ +class DynamicPulsarSerializationSchema + implements PulsarSerializationSchema<RowData>, PulsarContextAware<RowData> { + + private static final long serialVersionUID = 1L; + + private final @Nullable + SerializationSchema<RowData> keySerialization; + + private final SerializationSchema<RowData> valueSerialization; + + private final RowData.FieldGetter[] keyFieldGetters; + + private final RowData.FieldGetter[] valueFieldGetters; + + private final boolean hasMetadata; + + private final boolean upsertMode; + + /** + * Contains the position for each value of {@link PulsarDynamicTableSink.WritableMetadata} in the consumed row or + * -1 if this metadata key is not used. + */ + private final int[] metadataPositions; + + private int[] partitions; + + private int parallelInstanceId; + + private int numParallelInstances; + + private DataType valueDataType; + + private String valueFormatType; + + private volatile Schema<RowData> schema; + + /** + * delay milliseconds message. + */ + private long delayMilliseconds; + + DynamicPulsarSerializationSchema( + @Nullable SerializationSchema<RowData> keySerialization, + SerializationSchema<RowData> valueSerialization, + RowData.FieldGetter[] keyFieldGetters, + RowData.FieldGetter[] valueFieldGetters, + boolean hasMetadata, + int[] metadataPositions, + boolean upsertMode, + DataType valueDataType, + String valueFormatType, + long delayMilliseconds) { + if (upsertMode) { + checkArgument(keySerialization != null && keyFieldGetters.length > 0, + "Key must be set in upsert mode for serialization schema."); + } + this.keySerialization = keySerialization; + this.valueSerialization = valueSerialization; + this.keyFieldGetters = keyFieldGetters; + this.valueFieldGetters = valueFieldGetters; + this.hasMetadata = hasMetadata; + this.metadataPositions = metadataPositions; + this.upsertMode = upsertMode; + this.valueDataType = valueDataType; + this.valueFormatType = valueFormatType; + this.delayMilliseconds = delayMilliseconds; + } + + @Override + public void open(SerializationSchema.InitializationContext context) throws Exception { + if (keySerialization != null) { + keySerialization.open(context); + } + valueSerialization.open(context); + } + + @Override + public void serialize(RowData consumedRow, TypedMessageBuilder<RowData> messageBuilder) { + + // shortcut in case no input projection is required + if (keySerialization == null && !hasMetadata) { + messageBuilder.value(consumedRow); + return; + } + + // set delay message. + if (delayMilliseconds > 0) { + messageBuilder.deliverAfter(delayMilliseconds, TimeUnit.MILLISECONDS); + } + + if (keySerialization != null) { + final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters); + messageBuilder.keyBytes(keySerialization.serialize(keyRow)); + } + + final RowKind kind = consumedRow.getRowKind(); + final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters); + if (upsertMode) { + if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) { + // transform the message as the tombstone message + } else { + // make the message to be INSERT to be compliant with the INSERT-ONLY format + valueRow.setRowKind(RowKind.INSERT); + messageBuilder.value(valueRow); + } + } else { + messageBuilder.value(valueRow); + } + + Map<String, String> properties = readMetadata(consumedRow, PulsarDynamicTableSink.WritableMetadata.PROPERTIES); + if (properties != null) { + messageBuilder.properties(properties); + } + final Long eventTime = readMetadata(consumedRow, PulsarDynamicTableSink.WritableMetadata.EVENT_TIME); + if (eventTime != null && eventTime >= 0) { + messageBuilder.eventTime(eventTime); + } + } + + public Optional<String> getTargetTopic(RowData element) { + //TODO need get topic from row. + return Optional.empty(); + } + + @SuppressWarnings("unchecked") + private <T> T readMetadata(RowData consumedRow, PulsarDynamicTableSink.WritableMetadata metadata) { + final int pos = metadataPositions[metadata.ordinal()]; + if (pos < 0) { + return null; + } + return (T) metadata.converter.read(consumedRow, pos); + } + + private static RowData createProjectedRow(RowData consumedRow, RowKind kind, RowData.FieldGetter[] fieldGetters) { + final int arity = fieldGetters.length; + final GenericRowData genericRowData = new GenericRowData(kind, arity); + for (int fieldPos = 0; fieldPos < arity; fieldPos++) { + genericRowData.setField(fieldPos, fieldGetters[fieldPos].getFieldOrNull(consumedRow)); + } + return genericRowData; + } + + @Override + public TypeInformation<RowData> getProducedType() { + final RowType rowType = (RowType) valueDataType.getLogicalType(); + return InternalTypeInfo.of(rowType); + } + + @Override + public void setParallelInstanceId(int parallelInstanceId) { + this.parallelInstanceId = parallelInstanceId; + } + + @Override + public void setNumParallelInstances(int numParallelInstances) { + this.numParallelInstances = numParallelInstances; + } + + @Override + public Schema<RowData> getSchema() { + if (schema == null) { + synchronized (this) { + if (schema == null) { + schema = buildSchema(); + } + } + } + return schema; + } + + private FlinkSchema<RowData> buildSchema() { + if (StringUtils.isBlank(valueFormatType)) { + return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), valueSerialization, null); + } + Configuration configuration = new Configuration(); + hackPbSerializationSchema(configuration); + SchemaInfo schemaInfo = SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, configuration); + return new FlinkSchema<>(schemaInfo, valueSerialization, null); + } + + private void hackPbSerializationSchema(Configuration configuration) { + // reflect read PbRowSerializationSchema#messageClassName + if (valueSerialization instanceof PbRowDataSerializationSchema) { + try { + final String messageClassName = + (String) FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true); + configuration.set(PbFormatOptions.MESSAGE_CLASS_NAME, messageClassName); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + } + + // -------------------------------------------------------------------------------------------- + + interface MetadataConverter extends Serializable { + Object read(RowData consumedRow, int pos); + } +} \ No newline at end of file diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java new file mode 100644 index 000000000..c6be8c5e1 --- /dev/null +++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.flink.pulsar.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import org.apache.pulsar.common.naming.TopicName; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.GENERIC; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PARTITION_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_NAME; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_START_OFFSET; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_MESSAGE_ROUTER; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC_PATTERN; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createKeyFormatProjection; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getMessageRouter; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateSinkMessageRouter; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; +import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; + +/** + * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 + * + * Factory for creating configured instances of + * {@link org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory}. + * We modify PulsarDynamicTableFactory validate logic to support nested format. + */ +public class PulsarDynamicTableFactory implements + DynamicTableSourceFactory, + DynamicTableSinkFactory { + + public static final String IDENTIFIER = "pulsar-inlong"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + ReadableConfig tableOptions = helper.getOptions(); + + List<String> topics = generateTopic(context.getObjectIdentifier(), tableOptions); + if (topics != null && !topics.isEmpty()) { + ((Configuration) tableOptions).set(TOPIC, Collections.singletonList(topics.get(0))); + } + + final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat = + getKeyEncodingFormat(helper); + + final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = + getValueEncodingFormat(helper); + + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX, "type", "table-default-partitions", "default-database"); + // Validate the option values. + PulsarTableOptions.validateTableSinkOptions(tableOptions); + + Properties properties = getPulsarProperties(context.getCatalogTable().toProperties()); + validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat); + + final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + String adminUrl = tableOptions.get(ADMIN_URL); + String serverUrl = tableOptions.get(SERVICE_URL); + return createPulsarTableSink(tableOptions, topics, adminUrl, serverUrl, keyEncodingFormat, valueEncodingFormat, + properties, physicalDataType, keyProjection, valueProjection, keyPrefix, context); + } + + private PulsarDynamicTableSink createPulsarTableSink(ReadableConfig tableOptions, List<String> topics, + String adminUrl, String serverUrl, + Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat, + EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, + Properties properties, DataType physicalDataType, + int[] keyProjection, int[] valueProjection, + String keyPrefix, Context context) { + + final String formatType = tableOptions.getOptional(FORMAT).orElseGet(() -> tableOptions.get(VALUE_FORMAT)); + final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); + return new PulsarDynamicTableSink( + serverUrl, + adminUrl, + topics.get(0), + physicalDataType, + properties, + keyEncodingFormat.orElse(null), + valueEncodingFormat, + keyProjection, + valueProjection, + keyPrefix, + PulsarTableOptions.getSinkSemantic(tableOptions), + formatType, + false, + parallelism, + getMessageRouter(tableOptions, context.getClassLoader()).orElse(null)); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + ReadableConfig tableOptions = helper.getOptions(); + + List<String> topics = generateTopic(context.getObjectIdentifier(), tableOptions); + if (topics != null && !topics.isEmpty()) { + ((Configuration) tableOptions).set(TOPIC, Collections.singletonList(topics.get(0))); + } + + // Generic Flink Table can reference multiple topics with topicPattern + String topicPattern = tableOptions.getOptional(TOPIC_PATTERN).orElse(null); + if (topicPattern != null && !topicPattern.isEmpty()) { + ((Configuration) tableOptions).set(TOPIC_PATTERN, topicPattern); + } + + final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = + getKeyDecodingFormat(helper); + + final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = + getValueDecodingFormat(helper); + final String valueFormatPrefix = tableOptions.getOptional(FORMAT) + .orElse(tableOptions.get(VALUE_FORMAT)); + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX, + "type", "table-default-partitions", "default-database", valueFormatPrefix); + // Validate the option values. + validateTableSourceOptions(tableOptions); + validateSinkMessageRouter(tableOptions); + validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); + + Properties properties = getPulsarProperties(context.getCatalogTable().toProperties()); + + final PulsarTableOptions.StartupOptions startupOptions = PulsarTableOptions + .getStartupOptions(tableOptions); + + final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + String adminUrl = tableOptions.get(ADMIN_URL); + String serviceUrl = tableOptions.get(SERVICE_URL); + return createPulsarTableSource( + physicalDataType, + keyDecodingFormat.orElse(null), + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topics, + topicPattern, + serviceUrl, + adminUrl, + properties, + startupOptions); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(SERVICE_URL); + options.add(ADMIN_URL); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FactoryUtil.FORMAT); + options.add(KEY_FORMAT); + options.add(KEY_FIELDS); + options.add(KEY_FIELDS_PREFIX); + options.add(VALUE_FORMAT); + options.add(VALUE_FIELDS_INCLUDE); + options.add(TOPIC); + options.add(TOPIC_PATTERN); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); + options.add(SCAN_STARTUP_SUB_NAME); + options.add(SCAN_STARTUP_SUB_START_OFFSET); + options.add(GENERIC); + + options.add(PARTITION_DISCOVERY_INTERVAL_MILLIS); + options.add(SINK_SEMANTIC); + options.add(SINK_MESSAGE_ROUTER); + options.add(SINK_PARALLELISM); + options.add(PROPERTIES); + return options; + } + + // -------------------------------------------------------------------------------------------- + + private List<String> generateTopic(ObjectIdentifier table, ReadableConfig tableOptions) { + List<String> topics = null; + if (tableOptions.get(GENERIC)) { + topics = tableOptions.getOptional(TOPIC).orElse(null); + } else { + String rawTopic = table.getDatabaseName() + "/" + table.getObjectName(); + final String topic = TopicName.get(rawTopic).toString(); + topics = Collections.singletonList(topic); + } + + return topics; + } + + private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat( + FactoryUtil.TableFactoryHelper helper) { + final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = + helper.discoverOptionalDecodingFormat( + DeserializationFormatFactory.class, + KEY_FORMAT); + keyDecodingFormat.ifPresent(format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyDecodingFormat; + } + + private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat( + FactoryUtil.TableFactoryHelper helper) { + final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat = + helper.discoverOptionalEncodingFormat( + SerializationFormatFactory.class, + KEY_FORMAT); + keyEncodingFormat.ifPresent(format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyEncodingFormat; + } + + private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat( + FactoryUtil.TableFactoryHelper helper) { + return helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT) + .orElseGet(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT)); + } + + private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat( + FactoryUtil.TableFactoryHelper helper) { + return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT) + .orElseGet(() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT)); + } + + private static void validatePKConstraints(ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { + if (catalogTable.getSchema().getPrimaryKey().isPresent() + && format.getChangelogMode().containsOnly(RowKind.INSERT)) { + Configuration options = Configuration.fromMap(catalogTable.getOptions()); + String formatName = options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); + throw new ValidationException(String.format( + "The Pulsar table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + + " on the table, because it can't guarantee the semantic of primary key.", + tableName.asSummaryString(), + formatName + )); + } + } + + // -------------------------------------------------------------------------------------------- + protected PulsarDynamicTableSource createPulsarTableSource( + DataType physicalDataType, + @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, + DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + @Nullable List<String> topics, + @Nullable String topicPattern, + String serviceUrl, + String adminUrl, + Properties properties, + PulsarTableOptions.StartupOptions startupOptions) { + return new PulsarDynamicTableSource( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topics, + topicPattern, + serviceUrl, + adminUrl, + properties, + startupOptions, + false); + } +} diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java new file mode 100644 index 000000000..4eb4ed47d --- /dev/null +++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.flink.pulsar.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils; +import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic; +import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Stream; + +/** + * pulsar dynamic table sink. + */ +public class PulsarDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata { + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Metadata that is appended at the end of a physical sink row. */ + protected List<String> metadataKeys; + + // -------------------------------------------------------------------------------------------- + // Format attributes + // -------------------------------------------------------------------------------------------- + + /** Data type to configure the formats. */ + protected final DataType physicalDataType; + /** + * The pulsar topic to write to. + */ + protected final String topic; + protected final String serviceUrl; + protected final String adminUrl; + + /** + * Properties for the pulsar producer. + */ + protected final Properties properties; + + /** Optional format for encoding keys to Pulsar. */ + protected final @Nullable + EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat; + /** + * Sink format for encoding records to pulsar. + */ + protected final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat; + + /** Indices that determine the key fields and the source position in the consumed row. */ + protected final int[] keyProjection; + + /** Indices that determine the value fields and the source position in the consumed row. */ + protected final int[] valueProjection; + + /** Prefix that needs to be removed from fields when constructing the physical data type. */ + protected final @Nullable + String keyPrefix; + + /** + * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message to + * tombstone message. + */ + protected final boolean upsertMode; + + /** Parallelism of the physical Pulsar producer. **/ + protected final @Nullable Integer parallelism; + + /** Sink commit semantic. */ + protected final PulsarSinkSemantic semantic; + + private final String formatType; + + private final MessageRouter messageRouter; + + protected PulsarDynamicTableSink( + String serviceUrl, + String adminUrl, + String topic, + DataType physicalDataType, + Properties properties, + @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat, + EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + PulsarSinkSemantic semantic, + String formatType, + boolean upsertMode, + @Nullable Integer parallelism, + @Nullable MessageRouter messageRouter) { + this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "serviceUrl data type must not be null."); + this.adminUrl = Preconditions.checkNotNull(adminUrl, "adminUrl data type must not be null."); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.physicalDataType = Preconditions.checkNotNull(physicalDataType, "Consumed data type must not be null."); + // Mutable attributes + this.metadataKeys = Collections.emptyList(); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.keyEncodingFormat = keyEncodingFormat; + this.valueEncodingFormat = Preconditions.checkNotNull(valueEncodingFormat, "Encoding format must not be null."); + this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key projection must not be null."); + this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null."); + this.keyPrefix = keyPrefix; + this.semantic = Preconditions.checkNotNull(semantic, "Semantic must not be null."); + this.formatType = Preconditions.checkNotNull(formatType, "FormatType must not be null."); + this.upsertMode = upsertMode; + this.parallelism = parallelism; + this.messageRouter = messageRouter; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return this.valueEncodingFormat.getChangelogMode(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final SerializationSchema<RowData> keySerialization = + createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix); + + final SerializationSchema<RowData> valueSerialization = + createSerialization(context, valueEncodingFormat, valueProjection, null); + + final PulsarSerializationSchema<RowData> pulsarSerializer = + createPulsarSerializer(keySerialization, valueSerialization); + + final SinkFunction<RowData> pulsarSink = createPulsarSink( + this.topic, + this.properties, + pulsarSerializer); + + return SinkFunctionProvider.of(pulsarSink, parallelism); + } + + private PulsarSerializationSchema<RowData> createPulsarSerializer(SerializationSchema<RowData> keySerialization, + SerializationSchema<RowData> valueSerialization) { + final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren(); + + final RowData.FieldGetter[] keyFieldGetters = Arrays.stream(keyProjection) + .mapToObj(targetField -> RowData.createFieldGetter(physicalChildren.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + + final RowData.FieldGetter[] valueFieldGetters = Arrays.stream(valueProjection) + .mapToObj(targetField -> RowData.createFieldGetter(physicalChildren.get(targetField), targetField)) + .toArray(RowData.FieldGetter[]::new); + + // determine the positions of metadata in the consumed row + final int[] metadataPositions = Stream.of(WritableMetadata.values()) + .mapToInt(m -> { + final int pos = metadataKeys.indexOf(m.key); + if (pos < 0) { + return -1; + } + return physicalChildren.size() + pos; + }) + .toArray(); + + // check if metadata is used at all + final boolean hasMetadata = metadataKeys.size() > 0; + + final long delayMilliseconds = Optional.ofNullable(this.properties + .getProperty(PulsarOptions.SEND_DELAY_MILLISECONDS, "0")) + .filter(StringUtils::isNumeric) + .map(Long::valueOf) + .orElse(0L); + + return new DynamicPulsarSerializationSchema( + keySerialization, + valueSerialization, + keyFieldGetters, + valueFieldGetters, + hasMetadata, + metadataPositions, + upsertMode, + DataTypeUtils.projectRow(physicalDataType, valueProjection), + formatType, + delayMilliseconds); + } + + private SinkFunction<RowData> createPulsarSink(String topic, Properties properties, + PulsarSerializationSchema<RowData> pulsarSerializer) { + final ClientConfigurationData configurationData = PulsarClientUtils + .newClientConf(serviceUrl, properties); + return new FlinkPulsarSink<RowData>( + adminUrl, + Optional.ofNullable(topic), + configurationData, + properties, + pulsarSerializer, + messageRouter, + PulsarSinkSemantic.valueOf(semantic.toString()) + ); + } + + public MessageRouter getMessageRouter() { + return messageRouter; + } + + private @Nullable + SerializationSchema<RowData> createSerialization( + DynamicTableSink.Context context, + @Nullable EncodingFormat<SerializationSchema<RowData>> format, + int[] projection, + @Nullable String prefix) { + if (format == null) { + return null; + } + DataType physicalFormatDataType = DataTypeUtils.projectRow(this.physicalDataType, projection); + if (prefix != null) { + physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); + } + return format.createRuntimeEncoder(context, physicalFormatDataType); + } + + @Override + public DynamicTableSink copy() { + final PulsarDynamicTableSink copy = new PulsarDynamicTableSink( + this.serviceUrl, + this.adminUrl, + this.topic, + this.physicalDataType, + this.properties, + this.keyEncodingFormat, + this.valueEncodingFormat, + this.keyProjection, + this.valueProjection, + this.keyPrefix, + this.semantic, + this.formatType, + this.upsertMode, + this.parallelism, + this.messageRouter); + copy.metadataKeys = metadataKeys; + return copy; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PulsarDynamicTableSink)) { + return false; + } + PulsarDynamicTableSink that = (PulsarDynamicTableSink) o; + return upsertMode == that.upsertMode && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(physicalDataType, that.physicalDataType) + && Objects.equals(topic, that.topic) && Objects.equals(serviceUrl, that.serviceUrl) + && Objects.equals(adminUrl, that.adminUrl) + && Objects.equals(properties, that.properties) + && Objects.equals(keyEncodingFormat, that.keyEncodingFormat) + && Objects.equals(valueEncodingFormat, that.valueEncodingFormat) + && Arrays.equals(keyProjection, that.keyProjection) + && Arrays.equals(valueProjection, that.valueProjection) + && Objects.equals(keyPrefix, that.keyPrefix) + && Objects.equals(parallelism, that.parallelism) && semantic == that.semantic + && Objects.equals(formatType, that.formatType) + && Objects.equals(messageRouter, that.messageRouter); + } + + @Override + public int hashCode() { + int result = + Objects.hash(metadataKeys, physicalDataType, topic, serviceUrl, adminUrl, properties, keyEncodingFormat, + valueEncodingFormat, keyPrefix, upsertMode, parallelism, semantic, formatType, messageRouter); + result = 31 * result + Arrays.hashCode(keyProjection); + result = 31 * result + Arrays.hashCode(valueProjection); + return result; + } + + @Override + public String asSummaryString() { + return "Pulsar dynamic table sink"; + } + + @Override + public Map<String, DataType> listWritableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(WritableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) { + this.metadataKeys = metadataKeys; + } + + enum WritableMetadata { + + PROPERTIES( + "properties", + // key and value of the map are nullable to make handling easier in queries + DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(), + (row, pos) -> { + if (row.isNullAt(pos)) { + return null; + } + final MapData map = row.getMap(pos); + final ArrayData keyArray = map.keyArray(); + final ArrayData valueArray = map.valueArray(); + + final Properties properties = new Properties(); + for (int i = 0; i < keyArray.size(); i++) { + if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) { + final String key = keyArray.getString(i).toString(); + final String value = valueArray.getString(i).toString(); + properties.put(key, value); + } + } + return properties; + } + ), + + EVENT_TIME( + "eventTime", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + (row, pos) -> { + if (row.isNullAt(pos)) { + return null; + } + return row.getTimestamp(pos, 3).getMillisecond(); + }); + final String key; + + final DataType dataType; + + final DynamicPulsarSerializationSchema.MetadataConverter converter; + + WritableMetadata(String key, DataType dataType, DynamicPulsarSerializationSchema.MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} + diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java new file mode 100644 index 000000000..9b853ffd6 --- /dev/null +++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.flink.pulsar.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.pulsar.config.StartupMode; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic; +import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; +import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC_PATTERN; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createKeyFormatProjection; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; + +/** + * Upsert-Pulsar factory. + */ +public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "upsert-pulsar-inlong"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(ADMIN_URL); + options.add(SERVICE_URL); + options.add(TOPIC); + options.add(KEY_FORMAT); + options.add(VALUE_FORMAT); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(KEY_FIELDS_PREFIX); + options.add(VALUE_FIELDS_INCLUDE); + options.add(FactoryUtil.SINK_PARALLELISM); + options.add(PROPERTIES); + return options; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + ReadableConfig tableOptions = helper.getOptions(); + DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat( + DeserializationFormatFactory.class, + KEY_FORMAT); + DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat( + DeserializationFormatFactory.class, + VALUE_FORMAT); + + // Validate the option data type. + final String valueFormatPrefix = tableOptions.getOptional(FORMAT) + .orElse(tableOptions.get(VALUE_FORMAT)); + helper.validateExcept(PulsarTableOptions.PROPERTIES_PREFIX, valueFormatPrefix); + TableSchema schema = context.getCatalogTable().getSchema(); + validateTableOptions(tableOptions, keyDecodingFormat, valueDecodingFormat, schema); + + Tuple2<int[], int[]> keyValueProjections = createKeyValueProjections(context.getCatalogTable()); + String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + Properties properties = getPulsarProperties(context.getCatalogTable().toProperties()); + + // always use earliest to keep data integrity + PulsarTableOptions.StartupOptions startupOptions = new PulsarTableOptions.StartupOptions(); + startupOptions.startupMode = StartupMode.EARLIEST; + startupOptions.specificOffsets = Collections.EMPTY_MAP; + + String adminUrl = tableOptions.get(ADMIN_URL); + String serverUrl = tableOptions.get(SERVICE_URL); + List<String> topics = tableOptions.get(TOPIC); + String topicPattern = tableOptions.get(TOPIC_PATTERN); + return new PulsarDynamicTableSource( + schema.toPhysicalRowDataType(), + keyDecodingFormat, + new DecodingFormatWrapper(valueDecodingFormat), + keyValueProjections.f0, + keyValueProjections.f1, + keyPrefix, + topics, + topicPattern, + serverUrl, + adminUrl, + properties, + startupOptions, + true); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + ReadableConfig tableOptions = helper.getOptions(); + + EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat( + SerializationFormatFactory.class, + KEY_FORMAT); + EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat( + SerializationFormatFactory.class, + VALUE_FORMAT); + + // Validate the option data type. + helper.validateExcept(PulsarTableOptions.PROPERTIES_PREFIX); + TableSchema schema = context.getCatalogTable().getSchema(); + validateTableOptions(tableOptions, keyEncodingFormat, valueEncodingFormat, schema); + + Tuple2<int[], int[]> keyValueProjections = createKeyValueProjections(context.getCatalogTable()); + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + Properties properties = getPulsarProperties(context.getCatalogTable().toProperties()); + Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM); + + String adminUrl = tableOptions.get(ADMIN_URL); + String serverUrl = tableOptions.get(SERVICE_URL); + String formatType = tableOptions.getOptional(FORMAT).orElseGet(() -> tableOptions.get(VALUE_FORMAT)); + + return new PulsarDynamicTableSink( + serverUrl, + adminUrl, + tableOptions.get(TOPIC).get(0), + schema.toPhysicalRowDataType(), + properties, + keyEncodingFormat, + new EncodingFormatWrapper(valueEncodingFormat), + keyValueProjections.f0, + keyValueProjections.f1, + keyPrefix, + PulsarSinkSemantic.AT_LEAST_ONCE, + formatType, + true, + parallelism, + KeyHashMessageRouterImpl.INSTANCE); + } + + private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable catalogTable) { + TableSchema schema = catalogTable.getSchema(); + // primary key should validated earlier + List<String> keyFields = schema.getPrimaryKey().get().getColumns(); + DataType physicalDataType = schema.toPhysicalRowDataType(); + + Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions()); + // upsert-pulsar will set key.fields to primary key fields by default + tableOptions.set(KEY_FIELDS, keyFields); + + int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + return Tuple2.of(keyProjection, valueProjection); + } + + // -------------------------------------------------------------------------------------------- + // Validation + // -------------------------------------------------------------------------------------------- + + private static void validateTableOptions( + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + TableSchema schema) { + validateTopic(tableOptions); + validateFormat(keyFormat, valueFormat, tableOptions); + validatePKConstraints(schema); + } + + private static void validateTopic(ReadableConfig tableOptions) { + List<String> topic = tableOptions.get(TOPIC); + if (topic.size() > 1) { + throw new ValidationException("The 'upsert-pulsar' connector doesn't support topic list now. " + + "Please use single topic as the value of the parameter 'topic'."); + } + } + + private static void validateFormat(Format keyFormat, Format valueFormat, ReadableConfig tableOptions) { + if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) { + String identifier = tableOptions.get(KEY_FORMAT); + throw new ValidationException(String.format( + "'upsert-pulsar' connector doesn't support '%s' as key format, " + + "because '%s' is not in insert-only mode.", + identifier, + identifier)); + } + if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) { + String identifier = tableOptions.get(VALUE_FORMAT); + throw new ValidationException(String.format( + "'upsert-Pulsar' connector doesn't support '%s' as value format, " + + "because '%s' is not in insert-only mode.", + identifier, + identifier)); + } + } + + private static void validatePKConstraints(TableSchema schema) { + if (!schema.getPrimaryKey().isPresent()) { + throw new ValidationException("'upsert-pulsar' tables require to define a PRIMARY KEY constraint. " + + "The PRIMARY KEY specifies which columns should be " + + "read from or write to the Pulsar message key. " + + "The PRIMARY KEY also defines records in the 'upsert-pulsar' table " + + "should update or delete on which keys."); + } + } + + // -------------------------------------------------------------------------------------------- + // Format wrapper + // -------------------------------------------------------------------------------------------- + + /** + * It is used to wrap the decoding format and expose the desired changelog mode. It's only works + * for insert-only format. + */ + protected static class DecodingFormatWrapper implements DecodingFormat<DeserializationSchema<RowData>> { + private final DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat; + + private static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder() + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + public DecodingFormatWrapper(DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat) { + this.innerDecodingFormat = innerDecodingFormat; + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType producedDataType) { + return innerDecodingFormat.createRuntimeDecoder(context, producedDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return SOURCE_CHANGELOG_MODE; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + DecodingFormatWrapper that = (DecodingFormatWrapper) obj; + return Objects.equals(innerDecodingFormat, that.innerDecodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash(innerDecodingFormat); + } + } + + /** + * It is used to wrap the encoding format and expose the desired changelog mode. It's only works + * for insert-only format. + */ + protected static class EncodingFormatWrapper implements EncodingFormat<SerializationSchema<RowData>> { + private final EncodingFormat<SerializationSchema<RowData>> innerEncodingFormat; + + public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + public EncodingFormatWrapper(EncodingFormat<SerializationSchema<RowData>> innerEncodingFormat) { + this.innerEncodingFormat = innerEncodingFormat; + } + + @Override + public SerializationSchema<RowData> createRuntimeEncoder( + DynamicTableSink.Context context, DataType consumedDataType) { + return innerEncodingFormat.createRuntimeEncoder(context, consumedDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return SINK_CHANGELOG_MODE; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EncodingFormatWrapper that = (EncodingFormatWrapper) obj; + return Objects.equals(innerEncodingFormat, that.innerEncodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash(innerEncodingFormat); + } + } +} + diff --git a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory index 9719b692b..83e2d7a4d 100644 --- a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory +++ b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory \ No newline at end of file +org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory +org.apache.inlong.sort.flink.pulsar.table.PulsarDynamicTableFactory \ No newline at end of file diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/PulsarSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/PulsarSqlParserTest.java new file mode 100644 index 000000000..6ae3b554a --- /dev/null +++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/PulsarSqlParserTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.singletenant.flink.parser; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.GroupInfo; +import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; +import org.apache.inlong.sort.protocol.node.format.CsvFormat; +import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelationShip; +import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip; +import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser; +import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class PulsarSqlParserTest { + + private KafkaLoadNode buildKafkaLoadNode() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo())); + List<FieldRelationShip> relations = Arrays + .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("id", new LongFormatInfo())), + new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("name", new StringFormatInfo()))); + return new KafkaLoadNode("1", "kafka_output", fields, relations, null, null, + "workerJson", "localhost:9092", + new JsonFormat(), null, + null, null); + } + + public PulsarExtractNode buildPulsarExtractNode() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("name", new StringFormatInfo())); + Format format = new CsvFormat(); + return new PulsarExtractNode("2", + "pulsar_input", + fields, + null, + null, + "persistent://public/default/test_stream", + "http://localhost:8080", + "pulsar://localhost:6650", + format, + "earliest", + null); + } + + private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) { + List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList()); + List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList()); + return new NodeRelationShip(inputIds, outputIds); + } + + @Test + public void testPulsar() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + env.disableOperatorChaining(); + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node inputNode = buildPulsarExtractNode(); + Node outputNode = buildKafkaLoadNode(); + StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), + Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode), + Collections.singletonList(outputNode)))); + GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + FlinkSqlParseResult result = parser.parse(); + Assert.assertTrue(result.tryExecute()); + } +} diff --git a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-single-tenant/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 78% copy from inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory copy to inlong-sort/sort-single-tenant/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 9719b692b..53bd06a1f 100644 --- a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory +++ b/inlong-sort/sort-single-tenant/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory \ No newline at end of file +org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory +org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory +org.apache.inlong.sort.flink.pulsar.table.PulsarDynamicTableFactory diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE index 22c6de5de..1c3f36a03 100644 --- a/licenses/inlong-sort/LICENSE +++ b/licenses/inlong-sort/LICENSE @@ -473,6 +473,12 @@ Source : flink-connector-hive 1.13.5 (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/NOTICE +1.3.7 inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java + inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java + inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java + inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java + Source : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.) + License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE ======================================================================= Apache InLong(Incubating) Subcomponents: @@ -666,6 +672,7 @@ The text of each license is the standard Apache 2.0 license. xerces:xercesImpl:2.12.0 - Xerces2 Java Parser (http://xerces.apache.org/xerces2-j), (The Apache License, Version 2.0) org.apache.zookeeper:zookeeper:3.6.3 - Apache ZooKeeper - Server (https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-server), (Apache License, Version 2.0) org.apache.zookeeper:zookeeper-jute:3.6.3 - Apache ZooKeeper - Jute (https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-jute), (Apache License, Version 2.0) + io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 - StreamNative :: Pulsar Flink Connector :: Scala 2.11 (https://github.com/streamnative/pulsar-flink/blob/release-1.13/LICENSE), (Apache License, Version 2.0) ======================================================================== diff --git a/pom.xml b/pom.xml index 91c393601..cd5514fe1 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ <flink.scala.binary.version>2.11</flink.scala.binary.version> <flink.jackson.version>2.12.1-13.0</flink.jackson.version> <flink.connector.postgres.cdc.version>2.0.2</flink.connector.postgres.cdc.version> + <flink.pulsar.version>1.13.6.1-rc9</flink.pulsar.version> <qcloud.flink.cos.fs.hadoop.version>1.10.0-0.1.10</qcloud.flink.cos.fs.hadoop.version> <qcloud.chdfs.version>2.5</qcloud.chdfs.version> @@ -977,6 +978,12 @@ <version>${flink.version}</version> </dependency> + <dependency> + <groupId>io.streamnative.connectors</groupId> + <artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId> + <version>${flink.pulsar.version}</version> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils-junit</artifactId>
