This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 4edc4daf5e7f6dca981744a6f3f963103916d397 Author: SteNicholas <[email protected]> AuthorDate: Wed Aug 18 15:03:04 2021 +0800 [#715] Support the RocketMQ TableSource based on the legacy Source implementation (#779) --- pom.xml | 4 +- .../rocketmq/flink/legacy/RocketMQConfig.java | 22 +- .../apache/rocketmq/flink/legacy/RocketMQSink.java | 22 +- ...etMQSource.java => RocketMQSourceFunction.java} | 28 +- .../rocketmq/flink/legacy/RunningChecker.java | 22 +- .../common/selector/DefaultTopicSelector.java | 22 +- .../common/selector/SimpleTopicSelector.java | 22 +- .../legacy/common/selector/TopicSelector.java | 22 +- .../KeyValueDeserializationSchema.java | 22 +- .../serialization/KeyValueSerializationSchema.java | 22 +- .../RowKeyValueDeserializationSchema.java | 407 +++++++++++++++++++++ .../SimpleKeyValueDeserializationSchema.java | 22 +- .../SimpleKeyValueSerializationSchema.java | 22 +- .../flink/legacy/common/util/MetricUtils.java | 22 +- .../flink/legacy/common/util/RetryUtil.java | 22 +- .../flink/legacy/common/util/RocketMQUtils.java | 22 +- .../flink/legacy/common/util/TestUtils.java | 22 +- .../flink/legacy/example/RocketMQFlinkExample.java | 5 +- .../flink/source/common/RocketMQOptions.java | 3 + .../deserializer/RowDeserializationSchema.java | 2 +- .../table/RocketMQDynamicTableSourceFactory.java | 6 +- .../source/table/RocketMQScanTableSource.java | 65 +++- .../rocketmq/flink/legacy/RocketMQSinkTest.java | 24 +- .../rocketmq/flink/legacy/RocketMQSourceTest.java | 28 +- .../common/selector/DefaultTopicSelectorTest.java | 24 +- .../common/selector/SimpleTopicSelectorTest.java | 24 +- .../RowKeyValueDeserializationSchemaTest.java | 50 +++ .../SimpleKeyValueSerializationSchemaTest.java | 24 +- .../RocketMQDynamicTableSourceFactoryTest.java | 111 ++++++ 29 files changed, 887 insertions(+), 226 deletions(-) diff --git a/pom.xml b/pom.xml index abec905..0675bb7 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <rocketmq.version>4.7.1</rocketmq.version> - <flink.version>1.13.0</flink.version> + <flink.version>1.13.1</flink.version> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> <spotless.version>2.4.2</spotless.version> @@ -45,11 +45,13 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java index 5c19b7a..fc257a1 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java index f91a684..b6e1793 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java similarity index 96% rename from src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java index 84260b6..8821a6d 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy; @@ -82,12 +84,12 @@ import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong * guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability * guarantees. */ -public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> +public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> { private static final long serialVersionUID = 1L; - private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class); + private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class); private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; private RunningChecker runningChecker; private transient DefaultMQPullConsumer consumer; @@ -115,7 +117,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> private Meter tpsMetric; - public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) { + public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, Properties props) { this.schema = schema; this.props = props; } diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java index c48361a..f36b727 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java index 6be5218..128b19e 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.selector; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java index 674b5a0..dcdaa2f 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.selector; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java index 581dadc..a70c599 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.selector; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java index 4cc8c61..8d0c778 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.serialization; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java index 66b2e29..0000772 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.serialization; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java new file mode 100644 index 0000000..bc43b1c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.flink.legacy.common.serialization; + +import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy; +import org.apache.rocketmq.flink.source.util.ByteSerializer; +import org.apache.rocketmq.flink.source.util.StringSerializer; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization + * of message key and value.. + */ +public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> { + + private static final long serialVersionUID = -1L; + private static final Logger logger = + LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class); + + private transient TableSchema tableSchema; + private final DirtyDataStrategy formatErrorStrategy; + private final DirtyDataStrategy fieldMissingStrategy; + private final DirtyDataStrategy fieldIncrementStrategy; + private final String encoding; + private final String fieldDelimiter; + private final boolean columnErrorDebug; + private final int columnSize; + private final ByteSerializer.ValueType[] fieldTypes; + private final transient DataType[] fieldDataTypes; + private final Map<String, Integer> columnIndexMapping; + private long lastLogExceptionTime; + private long lastLogHandleFieldTime; + + private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000; + + public RowKeyValueDeserializationSchema( + TableSchema tableSchema, + DirtyDataStrategy formatErrorStrategy, + DirtyDataStrategy fieldMissingStrategy, + DirtyDataStrategy fieldIncrementStrategy, + String encoding, + String fieldDelimiter, + boolean columnErrorDebug, + Map<String, String> properties) { + this.tableSchema = tableSchema; + this.formatErrorStrategy = formatErrorStrategy; + this.fieldMissingStrategy = fieldMissingStrategy; + this.fieldIncrementStrategy = fieldIncrementStrategy; + this.columnErrorDebug = columnErrorDebug; + this.encoding = encoding; + this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter); + this.columnSize = tableSchema.getFieldNames().length; + this.fieldTypes = new ByteSerializer.ValueType[columnSize]; + this.columnIndexMapping = new HashMap<>(); + for (int index = 0; index < columnSize; index++) { + this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index); + } + for (int index = 0; index < columnSize; index++) { + ByteSerializer.ValueType type = + ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass()); + this.fieldTypes[index] = type; + } + + DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(properties); + this.fieldDataTypes = tableSchema.getFieldDataTypes(); + this.lastLogExceptionTime = System.currentTimeMillis(); + this.lastLogHandleFieldTime = System.currentTimeMillis(); + } + + @Override + public RowData deserializeKeyAndValue(byte[] key, byte[] value) { + if (isOnlyHaveVarbinaryDataField()) { + GenericRowData rowData = new GenericRowData(columnSize); + rowData.setField(0, value); + return rowData; + } else { + if (value == null) { + logger.info("Deserialize empty BytesMessage body, ignore the empty message."); + return null; + } + return deserializeValue(value); + } + } + + @Override + public TypeInformation<RowData> getProducedType() { + return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType()); + } + + private boolean isOnlyHaveVarbinaryDataField() { + if (columnSize == 1) { + return isByteArrayType(tableSchema.getFieldNames()[0]); + } + return false; + } + + private RowData deserializeValue(byte[] value) { + String body; + try { + body = new String(value, encoding); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter); + if (columnSize == 1) { + data = new String[1]; + data[0] = body; + } + if (data.length < columnSize) { + data = handleFieldMissing(data); + } else if (data.length > columnSize) { + data = handleFieldIncrement(data); + } + if (data == null) { + return null; + } + GenericRowData rowData = new GenericRowData(columnSize); + boolean skip = false; + for (int index = 0; index < columnSize; index++) { + try { + String fieldValue = getValue(data, body, index); + rowData.setField( + index, + StringSerializer.deserialize( + fieldValue, + fieldTypes[index], + fieldDataTypes[index], + new HashSet<>())); + } catch (Exception e) { + skip = handleException(rowData, index, data, e); + } + } + if (skip) { + return null; + } + return rowData; + } + + private String getValue(String[] data, String line, int index) { + String fieldValue = null; + if (columnSize == 1) { + fieldValue = line; + } else { + if (index < data.length) { + fieldValue = data[index]; + } + } + return fieldValue; + } + + private boolean isByteArrayType(String fieldName) { + TypeInformation<?> typeInformation = + tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)]; + if (typeInformation != null) { + ByteSerializer.ValueType valueType = + ByteSerializer.getTypeIndex(typeInformation.getTypeClass()); + return valueType == ByteSerializer.ValueType.V_ByteArray; + } + return false; + } + + private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) { + boolean skip = false; + switch (formatErrorStrategy) { + case SKIP: + long now = System.currentTimeMillis(); + if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) { + logger.warn( + "Data format error, field type: " + + fieldTypes[index] + + "field data: " + + data[index] + + ", index: " + + index + + ", data: [" + + StringUtils.join(data, ",") + + "]", + e); + lastLogExceptionTime = now; + } + skip = true; + break; + case SKIP_SILENT: + skip = true; + break; + default: + case CUT: + case NULL: + case PAD: + row.setField(index, null); + break; + case EXCEPTION: + throw new RuntimeException(e); + } + + return skip; + } + + private String[] handleFieldMissing(String[] data) { + String fieldMissingMessage = + String.format( + "Field missing exception, table column number: %d, data column number: %d, data field number: %d, data: [%s].", + columnSize, columnSize, data.length, StringUtils.join(data, ",")); + switch (fieldMissingStrategy) { + default: + case SKIP: + long now = System.currentTimeMillis(); + if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) { + logger.warn(fieldMissingMessage); + lastLogHandleFieldTime = now; + } + return null; + case SKIP_SILENT: + return null; + case CUT: + case NULL: + case PAD: + return data; + case EXCEPTION: + logger.error(fieldMissingMessage); + throw new RuntimeException(fieldMissingMessage); + } + } + + private String[] handleFieldIncrement(String[] data) { + String fieldIncrementMessage = + String.format( + "Field increment exception, table column number: %d, data column number: %d, data field number: %d, data: [%s].", + columnSize, columnSize, data.length, StringUtils.join(data, ",")); + switch (fieldIncrementStrategy) { + case SKIP: + long now = System.currentTimeMillis(); + if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) { + logger.warn(fieldIncrementMessage); + lastLogHandleFieldTime = now; + } + return null; + case SKIP_SILENT: + return null; + default: + case CUT: + case NULL: + case PAD: + return data; + case EXCEPTION: + logger.error(fieldIncrementMessage); + throw new RuntimeException(fieldIncrementMessage); + } + } + + /** Builder of {@link RowKeyValueDeserializationSchema}. */ + public static class Builder { + + private TableSchema schema; + private DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP; + private DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP; + private DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT; + private String encoding = "UTF-8"; + private String fieldDelimiter = "\u0001"; + private boolean columnErrorDebug = false; + private Map<String, String> properties; + + public Builder() {} + + public Builder setTableSchema(TableSchema tableSchema) { + this.schema = tableSchema; + return this; + } + + public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) { + this.formatErrorStrategy = formatErrorStrategy; + return this; + } + + public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) { + this.fieldMissingStrategy = fieldMissingStrategy; + return this; + } + + public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) { + this.fieldIncrementStrategy = fieldIncrementStrategy; + return this; + } + + public Builder setEncoding(String encoding) { + this.encoding = encoding; + return this; + } + + public Builder setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + return this; + } + + public Builder setColumnErrorDebug(boolean columnErrorDebug) { + this.columnErrorDebug = columnErrorDebug; + return this; + } + + public Builder setProperties(Map<String, String> properties) { + this.properties = properties; + if (null == properties) { + return this; + } + Configuration configuration = new Configuration(); + for (String key : properties.keySet()) { + configuration.setString(key, properties.get(key)); + } + String lengthCheck = configuration.get(CollectorOption.LENGTH_CHECK); + switch (lengthCheck.toUpperCase()) { + case "SKIP": + { + this.setFormatErrorStrategy(DirtyDataStrategy.SKIP); + this.setFieldMissingStrategy(DirtyDataStrategy.SKIP); + this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP); + } + break; + case "PAD": + { + this.setFormatErrorStrategy(DirtyDataStrategy.SKIP); + this.setFieldMissingStrategy(DirtyDataStrategy.PAD); + this.setFieldIncrementStrategy(DirtyDataStrategy.CUT); + } + break; + case "EXCEPTION": + { + this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION); + this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION); + this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION); + } + break; + case "SKIP_SILENT": + { + this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT); + this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT); + this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT); + } + break; + default: + } + this.setEncoding(configuration.getString(CollectorOption.ENCODING)); + this.setFieldDelimiter(configuration.getString(CollectorOption.FIELD_DELIMITER)); + this.setColumnErrorDebug(configuration.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG)); + return this; + } + + public RowKeyValueDeserializationSchema build() { + return new RowKeyValueDeserializationSchema( + schema, + formatErrorStrategy, + fieldMissingStrategy, + fieldIncrementStrategy, + encoding, + fieldDelimiter, + columnErrorDebug, + properties); + } + } + + /** Options for {@link RowKeyValueDeserializationSchema}. */ + public static class CollectorOption { + public static final ConfigOption<String> ENCODING = + ConfigOptions.key("encoding".toLowerCase()).defaultValue("UTF-8"); + public static final ConfigOption<String> FIELD_DELIMITER = + ConfigOptions.key("fieldDelimiter".toLowerCase()).defaultValue("\u0001"); + public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG = + ConfigOptions.key("columnErrorDebug".toLowerCase()).defaultValue(true); + public static final ConfigOption<String> LENGTH_CHECK = + ConfigOptions.key("lengthCheck".toLowerCase()).defaultValue("NONE"); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java index 7dada93..456f477 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.serialization; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java index 3e92ad2..c3ae600 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.serialization; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java index bb3baeb..a54a29a 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.util; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java index 7ec1dca..e53caf1 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.util; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java index 94a24a1..1f084a8 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.util; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java index 407aec7..70c26d9 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java @@ -1,15 +1,17 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.flink.legacy.common.util; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java index b435726..fc0d3cb 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java @@ -20,7 +20,7 @@ package org.apache.rocketmq.flink.legacy.example; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.flink.legacy.RocketMQConfig; import org.apache.rocketmq.flink.legacy.RocketMQSink; -import org.apache.rocketmq.flink.legacy.RocketMQSource; +import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction; import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema; import org.apache.rocketmq.flink.legacy.function.SinkMapFunction; import org.apache.rocketmq.flink.legacy.function.SourceMapFunction; @@ -114,7 +114,8 @@ public class RocketMQFlinkExample { SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema(); DataStreamSource<Tuple2<String, String>> source = - env.addSource(new RocketMQSource<>(schema, consumerProps)).setParallelism(2); + env.addSource(new RocketMQSourceFunction<>(schema, consumerProps)) + .setParallelism(2); source.print(); source.process(new SourceMapFunction()) diff --git a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java index 064e193..000e090 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java +++ b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java @@ -53,6 +53,9 @@ public class RocketMQOptions { public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS = ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L); + public static final ConfigOption<Boolean> OPTIONAL_USE_NEW_API = + ConfigOptions.key("useNewApi").booleanType().defaultValue(true); + public static final ConfigOption<String> OPTIONAL_ENCODING = ConfigOptions.key("encoding").stringType().defaultValue("UTF-8"); diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java index f106693..b946016 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java @@ -109,7 +109,7 @@ public class RowDeserializationSchema this.fieldTypes = new ValueType[totalColumnSize]; this.columnIndexMapping = new HashMap<>(); this.dataIndexMapping = new HashMap<>(); - for (int index = 0; index < tableSchema.getFieldNames().length; index++) { + for (int index = 0; index < totalColumnSize; index++) { this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index); } for (int index = 0; index < totalColumnSize; index++) { diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java index ec41fc6..990e28b 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java @@ -56,6 +56,7 @@ import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_S import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS; import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG; import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_USE_NEW_API; import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC; /** @@ -90,6 +91,7 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact optionalOptions.add(OPTIONAL_END_TIME); optionalOptions.add(OPTIONAL_TIME_ZONE); optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS); + optionalOptions.add(OPTIONAL_USE_NEW_API); optionalOptions.add(OPTIONAL_ENCODING); optionalOptions.add(OPTIONAL_FIELD_DELIMITER); optionalOptions.add(OPTIONAL_LINE_DELIMITER); @@ -146,6 +148,7 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact } long partitionDiscoveryIntervalMs = configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS); + boolean useNewApi = configuration.getBoolean(OPTIONAL_USE_NEW_API); DescriptorProperties descriptorProperties = new DescriptorProperties(); descriptorProperties.putProperties(rawProperties); TableSchema physicalSchema = @@ -161,7 +164,8 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact stopInMs, startMessageOffset, startMessageOffset < 0 ? startTime : -1L, - partitionDiscoveryIntervalMs); + partitionDiscoveryIntervalMs, + useNewApi); } private void transformContext( diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java index 37ab6a5..2a0d28a 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java @@ -17,6 +17,10 @@ package org.apache.rocketmq.flink.source.table; +import org.apache.rocketmq.flink.legacy.RocketMQConfig; +import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction; +import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema; +import org.apache.rocketmq.flink.legacy.common.serialization.RowKeyValueDeserializationSchema; import org.apache.rocketmq.flink.source.RocketMQSource; import org.apache.rocketmq.flink.source.reader.deserializer.BytesMessage; import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; @@ -28,6 +32,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.data.RowData; @@ -39,6 +44,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Stream; import static org.apache.flink.api.connector.source.Boundedness.BOUNDED; @@ -59,6 +65,7 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading private final long partitionDiscoveryIntervalMs; private final long startMessageOffset; private final long startTime; + private final boolean useNewApi; private List<String> metadataKeys; @@ -72,7 +79,8 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading long stopInMs, long startMessageOffset, long startTime, - long partitionDiscoveryIntervalMs) { + long partitionDiscoveryIntervalMs, + boolean useNewApi) { this.properties = properties; this.schema = schema; this.topic = topic; @@ -83,6 +91,7 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading this.startMessageOffset = startMessageOffset; this.startTime = startTime; this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; + this.useNewApi = useNewApi; this.metadataKeys = Collections.emptyList(); } @@ -93,18 +102,25 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - return SourceProvider.of( - new RocketMQSource<>( - topic, - consumerGroup, - nameServerAddress, - tag, - stopInMs, - startTime, - startMessageOffset < 0 ? 0 : startMessageOffset, - partitionDiscoveryIntervalMs, - isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED, - createDeserializationSchema())); + if (useNewApi) { + return SourceProvider.of( + new RocketMQSource<>( + topic, + consumerGroup, + nameServerAddress, + tag, + stopInMs, + startTime, + startMessageOffset < 0 ? 0 : startMessageOffset, + partitionDiscoveryIntervalMs, + isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED, + createRocketMQDeserializationSchema())); + } else { + return SourceFunctionProvider.of( + new RocketMQSourceFunction<>( + createKeyValueDeserializationSchema(), getConsumerProps()), + isBounded()); + } } @Override @@ -133,17 +149,18 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading stopInMs, startMessageOffset, startTime, - partitionDiscoveryIntervalMs); + partitionDiscoveryIntervalMs, + useNewApi); tableSource.metadataKeys = metadataKeys; return tableSource; } @Override public String asSummaryString() { - return "RocketMQScanTableSource"; + return RocketMQScanTableSource.class.getName(); } - private RocketMQDeserializationSchema<RowData> createDeserializationSchema() { + private RocketMQDeserializationSchema<RowData> createRocketMQDeserializationSchema() { final MetadataConverter[] metadataConverters = metadataKeys.stream() .map( @@ -162,6 +179,22 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading return stopInMs != Long.MAX_VALUE; } + private KeyValueDeserializationSchema<RowData> createKeyValueDeserializationSchema() { + return new RowKeyValueDeserializationSchema.Builder() + .setProperties(properties.asMap()) + .setTableSchema(schema) + .build(); + } + + private Properties getConsumerProps() { + Properties consumerProps = new Properties(); + consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, topic); + consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, consumerGroup); + consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress); + consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag); + return consumerProps; + } + // -------------------------------------------------------------------------------------------- // Metadata handling // -------------------------------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java index c45dbdf..ad3c0b1 100644 --- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java +++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java @@ -1,17 +1,21 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.flink.legacy; import org.apache.rocketmq.client.producer.DefaultMQProducer; diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java index a863ddd..7ce124d 100644 --- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java +++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java @@ -1,17 +1,21 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.flink.legacy; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -50,7 +54,7 @@ import static org.mockito.Mockito.when; @Ignore public class RocketMQSourceTest { - private RocketMQSource rocketMQSource; + private RocketMQSourceFunction rocketMQSource; private MQPullConsumerScheduleService pullConsumerScheduleService; private DefaultMQPullConsumer consumer; private KeyValueDeserializationSchema deserializationSchema; @@ -60,7 +64,7 @@ public class RocketMQSourceTest { public void setUp() throws Exception { deserializationSchema = new SimpleKeyValueDeserializationSchema(); Properties props = new Properties(); - rocketMQSource = new RocketMQSource(deserializationSchema, props); + rocketMQSource = new RocketMQSourceFunction(deserializationSchema, props); setFieldValue(rocketMQSource, "topic", topic); setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck()); diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java index b235c63..aa1528a 100644 --- a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java +++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java @@ -1,17 +1,21 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.flink.legacy.common.selector; import org.junit.Test; diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java index 5c0f755..dc93d14 100644 --- a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java +++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java @@ -1,17 +1,21 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.flink.legacy.common.selector; import org.junit.Test; diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java new file mode 100644 index 0000000..2c1786a --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.legacy.common.serialization; + +import org.apache.rocketmq.common.message.MessageExt; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; + +import org.junit.Test; + +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +/** Test for {@link RowKeyValueDeserializationSchema}. */ +public class RowKeyValueDeserializationSchemaTest { + + @Test + public void testDeserializeKeyAndValue() { + TableSchema tableSchema = + new TableSchema.Builder().field("varchar", DataTypes.VARCHAR(100)).build(); + RowKeyValueDeserializationSchema deserializationSchema = + new RowKeyValueDeserializationSchema.Builder() + .setTableSchema(tableSchema) + .setProperties(new HashMap<>()) + .build(); + MessageExt messageExt = new MessageExt(); + messageExt.setBody("test_deserialize_key_and_value".getBytes()); + RowData rowData = deserializationSchema.deserializeKeyAndValue(null, messageExt.getBody()); + assertEquals(new String(messageExt.getBody()), rowData.getString(0).toString()); + } +} diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java index 78baf20..7e2e0d9 100644 --- a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java +++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java @@ -1,17 +1,21 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * <p>http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * <p>Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.flink.legacy.common.serialization; import org.junit.Test; diff --git a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java new file mode 100644 index 0000000..184a23f --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.table; + +import org.apache.rocketmq.flink.source.common.RocketMQOptions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link RocketMQDynamicTableSourceFactory}. */ +public class RocketMQDynamicTableSourceFactoryTest { + + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Collections.singletonList(Column.physical("name", STRING().notNull())), + new ArrayList<>(), + null); + + private static final String IDENTIFIER = "rocketmq"; + private static final String TOPIC = "test_source"; + private static final String CONSUMER_GROUP = "test_consumer"; + private static final String NAME_SERVER_ADDRESS = "127.0.0.1:9876"; + + @Test + public void testRocketMQDynamicTableSourceWithLegalOption() { + final Map<String, String> options = new HashMap<>(); + options.put("connector", IDENTIFIER); + options.put(RocketMQOptions.TOPIC.key(), TOPIC); + options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP); + options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS); + final DynamicTableSource tableSource = createTableSource(options); + assertTrue(tableSource instanceof RocketMQScanTableSource); + assertEquals(RocketMQScanTableSource.class.getName(), tableSource.asSummaryString()); + } + + @Test(expected = ValidationException.class) + public void testRocketMQDynamicTableSourceWithoutRequiredOption() { + final Map<String, String> options = new HashMap<>(); + options.put("connector", IDENTIFIER); + options.put(RocketMQOptions.TOPIC.key(), TOPIC); + options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP); + options.put(RocketMQOptions.OPTIONAL_TAG.key(), "test_tag"); + createTableSource(options); + } + + @Test(expected = ValidationException.class) + public void testRocketMQDynamicTableSourceWithUnknownOption() { + final Map<String, String> options = new HashMap<>(); + options.put(RocketMQOptions.TOPIC.key(), TOPIC); + options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP); + options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS); + options.put("unknown", "test_option"); + createTableSource(options); + } + + private static DynamicTableSource createTableSource( + Map<String, String> options, Configuration conf) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("default", "default", IDENTIFIER), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(SCHEMA).build(), + "mock source", + Collections.emptyList(), + options), + SCHEMA), + conf, + RocketMQDynamicTableSourceFactory.class.getClassLoader(), + false); + } + + private static DynamicTableSource createTableSource(Map<String, String> options) { + return createTableSource(options, new Configuration()); + } +}
