This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit bbd7c6d46c29e5f298f08a281775c79cc63a7149 Author: SteNicholas <[email protected]> AuthorDate: Thu Aug 19 08:40:26 2021 +0800 [#780] Support the RocketMQ TableSink based on the legacy Sink implementation (#781) --- .../flink/{source => }/common/RocketMQOptions.java | 39 +++- .../apache/rocketmq/flink/legacy/RocketMQSink.java | 21 +- .../flink/sink/table/RocketMQDynamicTableSink.java | 257 +++++++++++++++++++++ .../table/RocketMQDynamicTableSinkFactory.java | 174 ++++++++++++++ .../flink/sink/table/RocketMQRowDataConverter.java | 217 +++++++++++++++++ .../flink/sink/table/RocketMQRowDataSink.java | 61 +++++ .../table/RocketMQDynamicTableSourceFactory.java | 32 +-- .../flink/source/util/StringSerializer.java | 2 +- .../org.apache.flink.table.factories.Factory | 3 +- .../RocketMQDynamicTableSinkFactoryTest.java} | 56 +++-- .../RocketMQDynamicTableSourceFactoryTest.java | 2 +- 11 files changed, 797 insertions(+), 67 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java similarity index 61% rename from src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java rename to src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java index 000e090..b34826d 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java +++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.rocketmq.flink.source.common; +package org.apache.rocketmq.flink.common; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -24,19 +24,23 @@ import org.apache.flink.configuration.ConfigOptions; /** Includes config options of RocketMQ connector type. */ public class RocketMQOptions { - public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic").noDefaultValue(); + public static final ConfigOption<String> TOPIC = + ConfigOptions.key("topic").stringType().noDefaultValue(); public static final ConfigOption<String> CONSUMER_GROUP = - ConfigOptions.key("consumerGroup").noDefaultValue(); + ConfigOptions.key("consumerGroup").stringType().noDefaultValue(); + + public static final ConfigOption<String> PRODUCER_GROUP = + ConfigOptions.key("producerGroup").stringType().noDefaultValue(); public static final ConfigOption<String> NAME_SERVER_ADDRESS = - ConfigOptions.key("nameServerAddress").noDefaultValue(); + ConfigOptions.key("nameServerAddress").stringType().noDefaultValue(); public static final ConfigOption<String> OPTIONAL_TAG = - ConfigOptions.key("tag").noDefaultValue(); + ConfigOptions.key("tag").stringType().noDefaultValue(); public static final ConfigOption<Integer> OPTIONAL_START_MESSAGE_OFFSET = - ConfigOptions.key("startMessageOffset").defaultValue(-1); + ConfigOptions.key("startMessageOffset").intType().defaultValue(-1); public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS = ConfigOptions.key("startTimeMs".toLowerCase()).longType().defaultValue(-1L); @@ -45,7 +49,7 @@ public class RocketMQOptions { ConfigOptions.key("startTime".toLowerCase()).stringType().noDefaultValue(); public static final ConfigOption<String> OPTIONAL_END_TIME = - ConfigOptions.key("endTime").noDefaultValue(); + ConfigOptions.key("endTime").stringType().noDefaultValue(); public static final ConfigOption<String> OPTIONAL_TIME_ZONE = ConfigOptions.key("timeZone".toLowerCase()).stringType().noDefaultValue(); @@ -70,4 +74,25 @@ public class RocketMQOptions { public static final ConfigOption<String> OPTIONAL_LENGTH_CHECK = ConfigOptions.key("lengthCheck").stringType().defaultValue("NONE"); + + public static final ConfigOption<Integer> OPTIONAL_WRITE_RETRY_TIMES = + ConfigOptions.key("retryTimes").intType().defaultValue(10); + + public static final ConfigOption<Long> OPTIONAL_WRITE_SLEEP_TIME_MS = + ConfigOptions.key("sleepTimeMs").longType().defaultValue(5000L); + + public static final ConfigOption<Boolean> OPTIONAL_WRITE_IS_DYNAMIC_TAG = + ConfigOptions.key("isDynamicTag").booleanType().defaultValue(false); + + public static final ConfigOption<String> OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN = + ConfigOptions.key("dynamicTagColumn").stringType().noDefaultValue(); + + public static final ConfigOption<Boolean> OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED = + ConfigOptions.key("dynamicTagColumnWriteIncluded").booleanType().defaultValue(true); + + public static final ConfigOption<String> OPTIONAL_WRITE_KEY_COLUMNS = + ConfigOptions.key("keyColumns").stringType().noDefaultValue(); + + public static final ConfigOption<Boolean> OPTIONAL_WRITE_KEYS_TO_BODY = + ConfigOptions.key("writeKeysToBody").booleanType().defaultValue(false); } diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java index b6e1793..328880b 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java @@ -47,7 +47,7 @@ import java.util.UUID; * batchFlushOnCheckpoint(true) is set. Otherwise, the sink reliability guarantees depends on * rocketmq producer's retry policy. */ -public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction { +public class RocketMQSink extends RichSinkFunction<Message> implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -104,12 +104,11 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint } @Override - public void invoke(IN input, Context context) throws Exception { + public void invoke(Message input, Context context) throws Exception { sinkInTps.markEvent(); - Message msg = (Message) input; if (batchFlushOnCheckpoint) { - batchList.add(msg); + batchList.add(input); if (batchList.size() >= batchSize) { flushSync(); } @@ -120,7 +119,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint if (async) { try { producer.send( - msg, + input, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -128,7 +127,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint long end = System.currentTimeMillis(); latencyGauge.report(end - timeStartWriting, 1); outTps.markEvent(); - outBps.markEvent(msg.getBody().length); + outBps.markEvent(input.getBody().length); } @Override @@ -143,7 +142,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint } } else { try { - SendResult result = producer.send(msg); + SendResult result = producer.send(input); LOG.debug("Sync send message result: {}", result); if (result.getSendStatus() != SendStatus.SEND_OK) { throw new RemotingException(result.toString()); @@ -151,7 +150,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint long end = System.currentTimeMillis(); latencyGauge.report(end - timeStartWriting, 1); outTps.markEvent(); - outBps.markEvent(msg.getBody().length); + outBps.markEvent(input.getBody().length); } catch (Exception e) { LOG.error("Sync send message exception: ", e); throw e; @@ -159,17 +158,17 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint } } - public RocketMQSink<IN> withAsync(boolean async) { + public RocketMQSink withAsync(boolean async) { this.async = async; return this; } - public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) { + public RocketMQSink withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) { this.batchFlushOnCheckpoint = batchFlushOnCheckpoint; return this; } - public RocketMQSink<IN> withBatchSize(int batchSize) { + public RocketMQSink withBatchSize(int batchSize) { this.batchSize = batchSize; return this; } diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java new file mode 100644 index 0000000..9839888 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.sink.table; + +import org.apache.rocketmq.flink.legacy.RocketMQConfig; +import org.apache.rocketmq.flink.legacy.RocketMQSink; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +import static org.apache.rocketmq.flink.sink.table.RocketMQRowDataConverter.MetadataConverter; + +/** Defines the dynamic table sink of RocketMQ. */ +public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata { + + private final DescriptorProperties properties; + private final TableSchema schema; + + private final String topic; + private final String producerGroup; + private final String nameServerAddress; + private final String tag; + private final String dynamicColumn; + private final String fieldDelimiter; + private final String encoding; + + private final long retryTimes; + private final long sleepTime; + + private final boolean isDynamicTag; + private final boolean isDynamicTagIncluded; + private final boolean writeKeysToBody; + + private final String[] keyColumns; + + private List<String> metadataKeys; + + public RocketMQDynamicTableSink( + DescriptorProperties properties, + TableSchema schema, + String topic, + String producerGroup, + String nameServerAddress, + String tag, + String dynamicColumn, + String fieldDelimiter, + String encoding, + long retryTimes, + long sleepTime, + boolean isDynamicTag, + boolean isDynamicTagIncluded, + boolean writeKeysToBody, + String[] keyColumns) { + this.properties = properties; + this.schema = schema; + this.topic = topic; + this.producerGroup = producerGroup; + this.nameServerAddress = nameServerAddress; + this.tag = tag; + this.dynamicColumn = dynamicColumn; + this.fieldDelimiter = fieldDelimiter; + this.encoding = encoding; + this.retryTimes = retryTimes; + this.sleepTime = sleepTime; + this.isDynamicTag = isDynamicTag; + this.isDynamicTagIncluded = isDynamicTagIncluded; + this.writeKeysToBody = writeKeysToBody; + this.keyColumns = keyColumns; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + if (kind != RowKind.UPDATE_BEFORE) { + builder.addContainedKind(kind); + } + } + return builder.build(); + } + + @Override + public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return SinkFunctionProvider.of(new RocketMQRowDataSink(createSink(), createConverter())); + } + + @Override + public Map<String, DataType> listWritableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(WritableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) { + this.metadataKeys = metadataKeys; + } + + @Override + public DynamicTableSink copy() { + RocketMQDynamicTableSink tableSink = + new RocketMQDynamicTableSink( + properties, + schema, + topic, + producerGroup, + nameServerAddress, + tag, + dynamicColumn, + fieldDelimiter, + encoding, + retryTimes, + sleepTime, + isDynamicTag, + isDynamicTagIncluded, + writeKeysToBody, + keyColumns); + tableSink.metadataKeys = metadataKeys; + return tableSink; + } + + @Override + public String asSummaryString() { + return RocketMQDynamicTableSink.class.getName(); + } + + private RocketMQSink createSink() { + return new RocketMQSink(getProducerProps()); + } + + private RocketMQRowDataConverter createConverter() { + final int[] metadataPositions = + Stream.of(WritableMetadata.values()) + .mapToInt( + m -> { + final int pos = metadataKeys.indexOf(m.key); + if (pos < 0) { + return -1; + } + return schema.getFieldCount() + pos; + }) + .toArray(); + return new RocketMQRowDataConverter( + topic, + tag, + dynamicColumn, + fieldDelimiter, + encoding, + isDynamicTag, + isDynamicTagIncluded, + writeKeysToBody, + keyColumns, + convertToRowTypeInfo(schema.toRowDataType()), + schema.getFieldDataTypes(), + metadataKeys.size() > 0, + metadataPositions); + } + + private Properties getProducerProps() { + Properties producerProps = new Properties(); + producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, producerGroup); + producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress); + return producerProps; + } + + protected static RowTypeInfo convertToRowTypeInfo(DataType fieldsDataType) { + final TypeInformation<?>[] fieldTypes = + fieldsDataType.getChildren().stream() + .map(LegacyTypeInfoDataTypeConverter::toLegacyTypeInfo) + .toArray(TypeInformation[]::new); + return new RowTypeInfo(fieldTypes); + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + + enum WritableMetadata { + KEYS( + "keys", + DataTypes.STRING().nullable(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(RowData row, int pos) { + if (row.isNullAt(pos)) { + return null; + } + return row.getString(pos).toString(); + } + }), + + TAGS( + "tags", + DataTypes.STRING().nullable(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(RowData row, int pos) { + if (row.isNullAt(pos)) { + return null; + } + return row.getString(pos).toString(); + } + }); + + final String key; + + final DataType dataType; + + final MetadataConverter converter; + + WritableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java new file mode 100644 index 0000000..72d29d5 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.sink.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; +import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_IS_DYNAMIC_TAG; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_KEYS_TO_BODY; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_KEY_COLUMNS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_RETRY_TIMES; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_SLEEP_TIME_MS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.PRODUCER_GROUP; +import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC; + +/** + * Defines the {@link DynamicTableSinkFactory} implementation to create {@link + * RocketMQDynamicTableSink}. + */ +public class RocketMQDynamicTableSinkFactory implements DynamicTableSinkFactory { + + @Override + public String factoryIdentifier() { + return "rocketmq"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> requiredOptions = new HashSet<>(); + requiredOptions.add(TOPIC); + requiredOptions.add(PRODUCER_GROUP); + requiredOptions.add(NAME_SERVER_ADDRESS); + return requiredOptions; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> optionalOptions = new HashSet<>(); + optionalOptions.add(OPTIONAL_TAG); + optionalOptions.add(OPTIONAL_WRITE_RETRY_TIMES); + optionalOptions.add(OPTIONAL_WRITE_SLEEP_TIME_MS); + optionalOptions.add(OPTIONAL_WRITE_IS_DYNAMIC_TAG); + optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN); + optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED); + optionalOptions.add(OPTIONAL_WRITE_KEYS_TO_BODY); + optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS); + optionalOptions.add(OPTIONAL_ENCODING); + optionalOptions.add(OPTIONAL_FIELD_DELIMITER); + return optionalOptions; + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + transformContext(this, context); + FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); + helper.validate(); + Map<String, String> rawProperties = context.getCatalogTable().getOptions(); + Configuration properties = Configuration.fromMap(rawProperties); + String topicName = properties.getString(TOPIC); + String producerGroup = properties.getString(PRODUCER_GROUP); + String nameServerAddress = properties.getString(NAME_SERVER_ADDRESS); + String tag = properties.getString(OPTIONAL_TAG); + String dynamicColumn = properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN); + String encoding = properties.getString(OPTIONAL_ENCODING); + String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER); + int retryTimes = properties.getInteger(OPTIONAL_WRITE_RETRY_TIMES); + long sleepTimeMs = properties.getLong(OPTIONAL_WRITE_SLEEP_TIME_MS); + boolean isDynamicTag = properties.getBoolean(OPTIONAL_WRITE_IS_DYNAMIC_TAG); + boolean isDynamicTagIncluded = + properties.getBoolean(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED); + boolean writeKeysToBody = properties.getBoolean(OPTIONAL_WRITE_KEYS_TO_BODY); + String keyColumnsConfig = properties.getString(OPTIONAL_WRITE_KEY_COLUMNS); + String[] keyColumns = new String[0]; + if (keyColumnsConfig != null && keyColumnsConfig.length() > 0) { + keyColumns = keyColumnsConfig.split(","); + } + DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(rawProperties); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + return new RocketMQDynamicTableSink( + descriptorProperties, + physicalSchema, + topicName, + producerGroup, + nameServerAddress, + tag, + dynamicColumn, + fieldDelimiter, + encoding, + sleepTimeMs, + retryTimes, + isDynamicTag, + isDynamicTagIncluded, + writeKeysToBody, + keyColumns); + } + + private void transformContext( + DynamicTableFactory factory, DynamicTableFactory.Context context) { + Map<String, String> catalogOptions = context.getCatalogTable().getOptions(); + Map<String, String> convertedOptions = + normalizeOptionCaseAsFactory(factory, catalogOptions); + catalogOptions.clear(); + for (Map.Entry<String, String> entry : convertedOptions.entrySet()) { + catalogOptions.put(entry.getKey(), entry.getValue()); + } + } + + private Map<String, String> normalizeOptionCaseAsFactory( + Factory factory, Map<String, String> options) { + Map<String, String> normalizedOptions = new HashMap<>(); + Map<String, String> requiredOptionKeysLowerCaseToOriginal = + factory.requiredOptions().stream() + .collect( + Collectors.toMap( + option -> option.key().toLowerCase(), ConfigOption::key)); + Map<String, String> optionalOptionKeysLowerCaseToOriginal = + factory.optionalOptions().stream() + .collect( + Collectors.toMap( + option -> option.key().toLowerCase(), ConfigOption::key)); + for (Map.Entry<String, String> entry : options.entrySet()) { + final String catalogOptionKey = entry.getKey(); + final String catalogOptionValue = entry.getValue(); + normalizedOptions.put( + requiredOptionKeysLowerCaseToOriginal.containsKey( + catalogOptionKey.toLowerCase()) + ? requiredOptionKeysLowerCaseToOriginal.get( + catalogOptionKey.toLowerCase()) + : optionalOptionKeysLowerCaseToOriginal.getOrDefault( + catalogOptionKey.toLowerCase(), catalogOptionKey), + catalogOptionValue); + } + return normalizedOptions; + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataConverter.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataConverter.java new file mode 100644 index 0000000..9c5c8af --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataConverter.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.sink.table; + +import org.apache.rocketmq.common.message.Message; + +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSink.WritableMetadata; +import static org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSink.WritableMetadata.KEYS; +import static org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSink.WritableMetadata.TAGS; + +/** RocketMQRowDataConverter converts the row data of table to RocketMQ message pattern. */ +public class RocketMQRowDataConverter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RocketMQRowDataConverter.class); + + private final String topic; + private final String tag; + private final String dynamicColumn; + private final String fieldDelimiter; + private final String encoding; + + private final boolean isDynamicTag; + private final boolean isDynamicTagIncluded; + private final boolean writeKeysToBody; + private boolean onlyVarbinary = false; + + private final String[] keyColumns; + private final RowTypeInfo rowTypeInfo; + private final DataType[] fieldDataTypes; + + private int[] keyFieldIndexes; + private int[] tagFieldIndexes; + private int[] bodyFieldIndexes; + private DataType[] bodyFieldTypes; + + private final boolean hasMetadata; + private final int[] metadataPositions; + + public RocketMQRowDataConverter( + String topic, + String tag, + String dynamicColumn, + String fieldDelimiter, + String encoding, + boolean isDynamicTag, + boolean isDynamicTagIncluded, + boolean writeKeysToBody, + String[] keyColumns, + RowTypeInfo rowTypeInfo, + DataType[] fieldDataTypes, + boolean hasMetadata, + int[] metadataPositions) { + this.topic = topic; + this.tag = tag; + this.dynamicColumn = dynamicColumn; + this.fieldDelimiter = fieldDelimiter; + this.encoding = encoding; + this.isDynamicTag = isDynamicTag; + this.isDynamicTagIncluded = isDynamicTagIncluded; + this.writeKeysToBody = writeKeysToBody; + this.keyColumns = keyColumns; + this.rowTypeInfo = rowTypeInfo; + this.fieldDataTypes = fieldDataTypes; + this.hasMetadata = hasMetadata; + this.metadataPositions = metadataPositions; + } + + public void open() { + if (rowTypeInfo.getArity() == 1 + && rowTypeInfo.getFieldTypes()[0].getTypeClass().equals(byte[].class)) { + onlyVarbinary = true; + } + Set<Integer> excludedFields = new HashSet<>(); + if (keyColumns != null) { + keyFieldIndexes = new int[keyColumns.length]; + for (int index = 0; index < keyColumns.length; index++) { + int fieldIndex = rowTypeInfo.getFieldIndex(keyColumns[index]); + checkState( + fieldIndex >= 0, + String.format( + "[MetaQConverter] Could not find the message-key column: %s.", + keyColumns[index])); + keyFieldIndexes[index] = fieldIndex; + if (!writeKeysToBody) { + excludedFields.add(fieldIndex); + } + } + } else { + keyFieldIndexes = new int[0]; + } + if (isDynamicTag && dynamicColumn != null) { + tagFieldIndexes = new int[1]; + int fieldIndex = rowTypeInfo.getFieldIndex(dynamicColumn); + checkState( + fieldIndex >= 0, + String.format( + "[MetaQConverter] Could not find the tag column: %s.", dynamicColumn)); + tagFieldIndexes[0] = fieldIndex; + if (!isDynamicTagIncluded) { + excludedFields.add(fieldIndex); + } + } else { + tagFieldIndexes = new int[0]; + } + bodyFieldIndexes = new int[rowTypeInfo.getArity() - excludedFields.size()]; + bodyFieldTypes = new DataType[rowTypeInfo.getArity() - excludedFields.size()]; + int index = 0; + for (int num = 0; num < rowTypeInfo.getArity(); num++) { + if (!excludedFields.contains(num)) { + bodyFieldIndexes[index] = num; + bodyFieldTypes[index++] = fieldDataTypes[num]; + } + } + } + + public Message convert(RowData row) { + if (row.getRowKind() != RowKind.INSERT && row.getRowKind() != RowKind.UPDATE_AFTER) { + return null; + } + Message message = new Message(); + message.setTopic(topic); + List<String> keys = new ArrayList<>(); + for (int fieldIndex : keyFieldIndexes) { + keys.add(row.getString(fieldIndex).toString()); + } + if (keys.size() > 0) { + message.setKeys(keys); + } + if (!isDynamicTag) { + if (tag != null && tag.length() > 0) { + message.setTags(tag); + } + } else { + checkState(tagFieldIndexes.length > 0, "No message tag column set."); + message.setTags(row.getString(tagFieldIndexes[0]).toString()); + } + if (onlyVarbinary) { + message.setBody(row.getBinary(0)); + message.setWaitStoreMsgOK(true); + } else { + Object[] values = new Object[bodyFieldIndexes.length]; + for (int index = 0; index < bodyFieldIndexes.length; index++) { + values[index] = + RowData.createFieldGetter( + bodyFieldTypes[index].getLogicalType(), + bodyFieldIndexes[index]) + .getFieldOrNull(row); + } + try { + message.setBody(StringUtils.join(values, fieldDelimiter).getBytes(encoding)); + message.setWaitStoreMsgOK(true); + } catch (UnsupportedEncodingException e) { + LOG.error( + String.format( + "Unsupported ''{%s}'' encoding charset. Check the encoding configItem in the DDL.", + encoding), + e); + } + } + if (hasMetadata) { + String messageKeys = readMetadata(row, KEYS); + message.setKeys(messageKeys); + message.setTags(readMetadata(row, TAGS)); + } + return message; + } + + @SuppressWarnings("unchecked") + private <T> T readMetadata(RowData consumedRow, WritableMetadata metadata) { + final int pos = metadataPositions[metadata.ordinal()]; + if (pos < 0) { + return null; + } + return (T) metadata.converter.read(consumedRow, pos); + } + + // -------------------------------------------------------------------------------------------- + + interface MetadataConverter extends Serializable { + Object read(RowData consumedRow, int pos); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataSink.java new file mode 100644 index 0000000..ac01829 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataSink.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * + * <p>Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.flink.sink.table; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.flink.legacy.RocketMQSink; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.data.RowData; + +/** RocketMQRowDataSink helps for writing the converted row data of table to RocketMQ messages. */ +public class RocketMQRowDataSink extends RichSinkFunction<RowData> { + + private static final long serialVersionUID = 1L; + + private final RocketMQSink sink; + private final RocketMQRowDataConverter converter; + + public RocketMQRowDataSink(RocketMQSink sink, RocketMQRowDataConverter converter) { + this.sink = sink; + this.converter = converter; + } + + @Override + public void open(Configuration configuration) throws Exception { + sink.open(configuration); + converter.open(); + } + + @Override + public void setRuntimeContext(RuntimeContext runtimeContext) { + sink.setRuntimeContext(runtimeContext); + } + + @Override + public void invoke(RowData rowData, Context context) throws Exception { + Message message = converter.convert(rowData); + if (message != null) { + sink.invoke(message, context); + } + } + + @Override + public void close() { + sink.close(); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java index 990e28b..36e3dc6 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java @@ -42,22 +42,22 @@ import java.util.TimeZone; import java.util.stream.Collectors; import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.CONSUMER_GROUP; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.NAME_SERVER_ADDRESS; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_ENCODING; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_END_TIME; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_USE_NEW_API; -import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC; +import static org.apache.rocketmq.flink.common.RocketMQOptions.CONSUMER_GROUP; +import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TIME_ZONE; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_USE_NEW_API; +import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC; /** * Defines the {@link DynamicTableSourceFactory} implementation to create {@link diff --git a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java index c0a67b8..e2bcf22 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java +++ b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.flink.source.util; -import java.util.Base64; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.flink.table.data.util.DataFormatConverters; @@ -30,6 +29,7 @@ import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.Base64; import java.util.Set; /** String serializer. */ diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 32de8b2..b164722 100644 --- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory \ No newline at end of file +org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory +org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactoryTest.java similarity index 64% copy from src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java copy to src/test/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactoryTest.java index 184a23f..b06695a 100644 --- a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java +++ b/src/test/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactoryTest.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.rocketmq.flink.source.table; +package org.apache.rocketmq.flink.sink.table; -import org.apache.rocketmq.flink.source.common.RocketMQOptions; +import org.apache.rocketmq.flink.common.RocketMQOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.Schema; @@ -28,7 +28,7 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.factories.FactoryUtil; import org.junit.Test; @@ -42,8 +42,8 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -/** Tests for {@link RocketMQDynamicTableSourceFactory}. */ -public class RocketMQDynamicTableSourceFactoryTest { +/** Tests for {@link org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory}. */ +public class RocketMQDynamicTableSinkFactoryTest { private static final ResolvedSchema SCHEMA = new ResolvedSchema( @@ -52,60 +52,56 @@ public class RocketMQDynamicTableSourceFactoryTest { null); private static final String IDENTIFIER = "rocketmq"; - private static final String TOPIC = "test_source"; - private static final String CONSUMER_GROUP = "test_consumer"; - private static final String NAME_SERVER_ADDRESS = "127.0.0.1:9876"; + private static final String TOPIC = "test_sink"; + private static final String PRODUCER_GROUP = "test_producer"; + private static final String NAME_SERVER_ADDRESS = + "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080"; @Test - public void testRocketMQDynamicTableSourceWithLegalOption() { + public void testRocketMQDynamicTableSinkWithLegalOption() { final Map<String, String> options = new HashMap<>(); options.put("connector", IDENTIFIER); options.put(RocketMQOptions.TOPIC.key(), TOPIC); - options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP); + options.put(RocketMQOptions.PRODUCER_GROUP.key(), PRODUCER_GROUP); options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS); - final DynamicTableSource tableSource = createTableSource(options); - assertTrue(tableSource instanceof RocketMQScanTableSource); - assertEquals(RocketMQScanTableSource.class.getName(), tableSource.asSummaryString()); + final DynamicTableSink tableSink = createDynamicTableSink(options); + assertTrue(tableSink instanceof RocketMQDynamicTableSink); + assertEquals(RocketMQDynamicTableSink.class.getName(), tableSink.asSummaryString()); } @Test(expected = ValidationException.class) - public void testRocketMQDynamicTableSourceWithoutRequiredOption() { + public void testRocketMQDynamicTableSinkWithoutRequiredOption() { final Map<String, String> options = new HashMap<>(); options.put("connector", IDENTIFIER); options.put(RocketMQOptions.TOPIC.key(), TOPIC); - options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP); + options.put(RocketMQOptions.PRODUCER_GROUP.key(), PRODUCER_GROUP); options.put(RocketMQOptions.OPTIONAL_TAG.key(), "test_tag"); - createTableSource(options); + createDynamicTableSink(options); } @Test(expected = ValidationException.class) - public void testRocketMQDynamicTableSourceWithUnknownOption() { + public void testRocketMQDynamicTableSinkWithUnknownOption() { final Map<String, String> options = new HashMap<>(); options.put(RocketMQOptions.TOPIC.key(), TOPIC); - options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP); + options.put(RocketMQOptions.PRODUCER_GROUP.key(), PRODUCER_GROUP); options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS); options.put("unknown", "test_option"); - createTableSource(options); + createDynamicTableSink(options); } - private static DynamicTableSource createTableSource( - Map<String, String> options, Configuration conf) { - return FactoryUtil.createTableSource( + private static DynamicTableSink createDynamicTableSink(Map<String, String> options) { + return FactoryUtil.createTableSink( null, - ObjectIdentifier.of("default", "default", IDENTIFIER), + ObjectIdentifier.of("default", "default", "mq"), new ResolvedCatalogTable( CatalogTable.of( Schema.newBuilder().fromResolvedSchema(SCHEMA).build(), - "mock source", + "mock sink", Collections.emptyList(), options), SCHEMA), - conf, - RocketMQDynamicTableSourceFactory.class.getClassLoader(), + new Configuration(), + RocketMQDynamicTableSinkFactory.class.getClassLoader(), false); } - - private static DynamicTableSource createTableSource(Map<String, String> options) { - return createTableSource(options, new Configuration()); - } } diff --git a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java index 184a23f..377e63b 100644 --- a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java +++ b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.flink.source.table; -import org.apache.rocketmq.flink.source.common.RocketMQOptions; +import org.apache.rocketmq.flink.common.RocketMQOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.Schema;
