This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 340ea48bbd68dc199679da52e35f707f9756a1b8 Author: SteNicholas <[email protected]> AuthorDate: Thu Jun 3 11:51:32 2021 +0800 [#715] Support the RocketMQ TableSource based on the new Source interface (#716) --- pom.xml | 13 +- .../rocketmq/flink/source/RocketMQSource.java | 18 +- .../flink/source/common/RocketMQOptions.java | 70 +++ .../enumerator/RocketMQSourceEnumerator.java | 20 +- .../reader/RocketMQPartitionSplitReader.java | 23 +- .../source/reader/deserializer/BytesMessage.java | 53 ++ ...ationSchema.java => DeserializationSchema.java} | 12 +- .../reader/deserializer/DirtyDataStrategy.java | 28 + ...ema.java => RocketMQDeserializationSchema.java} | 8 +- .../RocketMQRowDeserializationSchema.java | 104 ++++ .../deserializer/RowDeserializationSchema.java | 606 +++++++++++++++++++++ .../table/RocketMQDynamicTableSourceFactory.java | 211 +++++++ .../source/table/RocketMQScanTableSource.java | 195 +++++++ .../rocketmq/flink/source/util/ByteSerializer.java | 156 ++++++ .../rocketmq/flink/source/util/ByteUtils.java | 219 ++++++++ .../flink/source/util/StringSerializer.java | 155 ++++++ .../org.apache.flink.table.factories.Factory | 16 + .../RocketMQRowDeserializationSchemaTest.java | 141 +++++ 18 files changed, 2018 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index d5fc49f..abec905 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <rocketmq.version>4.7.1</rocketmq.version> - <flink.version>1.12.2</flink.version> + <flink.version>1.13.0</flink.version> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> <spotless.version>2.4.2</spotless.version> @@ -78,6 +78,12 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> @@ -102,6 +108,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-test</artifactId> + <version>${rocketmq.version}</version> + </dependency> <dependency> <groupId>commons-lang</groupId> diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java index b899618..79a8149 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator; import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader; import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter; import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader; -import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer; @@ -52,10 +52,11 @@ import java.util.function.Supplier; public class RocketMQSource<OUT> implements Source<OUT, RocketMQPartitionSplit, RocketMQSourceEnumState>, ResultTypeQueryable<OUT> { - private static final long serialVersionUID = -6755372893283732098L; + private static final long serialVersionUID = -1L; private final String topic; private final String consumerGroup; + private final String nameServerAddress; private final String tag; private final long stopInMs; private final long startTime; @@ -64,20 +65,22 @@ public class RocketMQSource<OUT> // Boundedness private final Boundedness boundedness; - private final RocketMQRecordDeserializationSchema<OUT> deserializationSchema; + private final RocketMQDeserializationSchema<OUT> deserializationSchema; public RocketMQSource( String topic, String consumerGroup, + String nameServerAddress, String tag, long stopInMs, long startTime, long startOffset, long partitionDiscoveryIntervalMs, Boundedness boundedness, - RocketMQRecordDeserializationSchema<OUT> deserializationSchema) { + RocketMQDeserializationSchema<OUT> deserializationSchema) { this.topic = topic; this.consumerGroup = consumerGroup; + this.nameServerAddress = nameServerAddress; this.tag = tag; this.stopInMs = stopInMs; this.startTime = startTime; @@ -93,8 +96,8 @@ public class RocketMQSource<OUT> } @Override - public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext) - throws Exception { + public SourceReader<OUT, RocketMQPartitionSplit> createReader( + SourceReaderContext readerContext) { FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue = new FutureCompletingBlockingQueue<>(); deserializationSchema.open( @@ -115,6 +118,7 @@ public class RocketMQSource<OUT> new RocketMQPartitionSplitReader<>( topic, consumerGroup, + nameServerAddress, tag, stopInMs, startTime, @@ -136,6 +140,7 @@ public class RocketMQSource<OUT> return new RocketMQSourceEnumerator( topic, consumerGroup, + nameServerAddress, stopInMs, startOffset, partitionDiscoveryIntervalMs, @@ -150,6 +155,7 @@ public class RocketMQSource<OUT> return new RocketMQSourceEnumerator( topic, consumerGroup, + nameServerAddress, stopInMs, startOffset, partitionDiscoveryIntervalMs, diff --git a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java new file mode 100644 index 0000000..064e193 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.common; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Includes config options of RocketMQ connector type. */ +public class RocketMQOptions { + + public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic").noDefaultValue(); + + public static final ConfigOption<String> CONSUMER_GROUP = + ConfigOptions.key("consumerGroup").noDefaultValue(); + + public static final ConfigOption<String> NAME_SERVER_ADDRESS = + ConfigOptions.key("nameServerAddress").noDefaultValue(); + + public static final ConfigOption<String> OPTIONAL_TAG = + ConfigOptions.key("tag").noDefaultValue(); + + public static final ConfigOption<Integer> OPTIONAL_START_MESSAGE_OFFSET = + ConfigOptions.key("startMessageOffset").defaultValue(-1); + + public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS = + ConfigOptions.key("startTimeMs".toLowerCase()).longType().defaultValue(-1L); + + public static final ConfigOption<String> OPTIONAL_START_TIME = + ConfigOptions.key("startTime".toLowerCase()).stringType().noDefaultValue(); + + public static final ConfigOption<String> OPTIONAL_END_TIME = + ConfigOptions.key("endTime").noDefaultValue(); + + public static final ConfigOption<String> OPTIONAL_TIME_ZONE = + ConfigOptions.key("timeZone".toLowerCase()).stringType().noDefaultValue(); + + public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS = + ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L); + + public static final ConfigOption<String> OPTIONAL_ENCODING = + ConfigOptions.key("encoding").stringType().defaultValue("UTF-8"); + + public static final ConfigOption<String> OPTIONAL_FIELD_DELIMITER = + ConfigOptions.key("fieldDelimiter").stringType().defaultValue("\u0001"); + + public static final ConfigOption<String> OPTIONAL_LINE_DELIMITER = + ConfigOptions.key("lineDelimiter").stringType().defaultValue("\n"); + + public static final ConfigOption<Boolean> OPTIONAL_COLUMN_ERROR_DEBUG = + ConfigOptions.key("columnErrorDebug").booleanType().defaultValue(true); + + public static final ConfigOption<String> OPTIONAL_LENGTH_CHECK = + ConfigOptions.key("lengthCheck").stringType().defaultValue("NONE"); +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java index 08290c6..61b563a 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java +++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.flink.source.enumerator; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; @@ -38,6 +37,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -59,6 +59,8 @@ public class RocketMQSourceEnumerator private final String topic; /** The consumer group used for this RocketMQSource. */ private final String consumerGroup; + /** The name server address used for this RocketMQSource. */ + private final String nameServerAddress; /** The stop timestamp for this RocketMQSource. */ private final long stopInMs; /** The start offset for this RocketMQSource. */ @@ -85,12 +87,13 @@ public class RocketMQSourceEnumerator private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment; // Lazily instantiated or mutable fields. - private MQPullConsumer consumer; + private DefaultMQPullConsumer consumer; private boolean noMoreNewPartitionSplits = false; public RocketMQSourceEnumerator( String topic, String consumerGroup, + String nameServerAddress, long stopInMs, long startOffset, long partitionDiscoveryIntervalMs, @@ -99,6 +102,7 @@ public class RocketMQSourceEnumerator this( topic, consumerGroup, + nameServerAddress, stopInMs, startOffset, partitionDiscoveryIntervalMs, @@ -110,6 +114,7 @@ public class RocketMQSourceEnumerator public RocketMQSourceEnumerator( String topic, String consumerGroup, + String nameServerAddress, long stopInMs, long startOffset, long partitionDiscoveryIntervalMs, @@ -118,6 +123,7 @@ public class RocketMQSourceEnumerator Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) { this.topic = topic; this.consumerGroup = consumerGroup; + this.nameServerAddress = nameServerAddress; this.stopInMs = stopInMs; this.startOffset = startOffset; this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; @@ -180,7 +186,7 @@ public class RocketMQSourceEnumerator } @Override - public RocketMQSourceEnumState snapshotState() { + public RocketMQSourceEnumState snapshotState(long checkpointId) { return new RocketMQSourceEnumState(readerIdToSplitAssignments); } @@ -298,6 +304,14 @@ public class RocketMQSourceEnumerator private void initialRocketMQConsumer() { try { consumer = new DefaultMQPullConsumer(consumerGroup); + consumer.setNamesrvAddr(nameServerAddress); + consumer.setInstanceName( + String.join( + "||", + ManagementFactory.getRuntimeMXBean().getName(), + topic, + consumerGroup, + "" + System.nanoTime())); consumer.start(); } catch (MQClientException e) { LOG.error("Failed to initial RocketMQ consumer.", e); diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java index 3bbeec8..41fbbea 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java @@ -19,13 +19,12 @@ package org.apache.rocketmq.flink.source.reader; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -43,6 +42,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -70,12 +70,12 @@ public class RocketMQPartitionSplitReader<T> private final long startTime; private final long startOffset; - private final RocketMQRecordDeserializationSchema<T> deserializationSchema; + private final RocketMQDeserializationSchema<T> deserializationSchema; private final Map<Tuple3<String, String, Integer>, Long> startingOffsets; private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps; private final SimpleCollector<T> collector; - private MQPullConsumer consumer; + private DefaultMQPullConsumer consumer; private volatile boolean wakeup = false; @@ -84,11 +84,12 @@ public class RocketMQPartitionSplitReader<T> public RocketMQPartitionSplitReader( String topic, String consumerGroup, + String nameServerAddress, String tag, long stopInMs, long startTime, long startOffset, - RocketMQRecordDeserializationSchema<T> deserializationSchema) { + RocketMQDeserializationSchema<T> deserializationSchema) { this.topic = topic; this.tag = tag; this.stopInMs = stopInMs; @@ -98,7 +99,7 @@ public class RocketMQPartitionSplitReader<T> this.startingOffsets = new HashMap<>(); this.stoppingTimestamps = new HashMap<>(); this.collector = new SimpleCollector<>(); - initialRocketMQConsumer(consumerGroup); + initialRocketMQConsumer(consumerGroup, nameServerAddress); } @Override @@ -280,9 +281,17 @@ public class RocketMQPartitionSplitReader<T> // --------------- private helper method ---------------------- - private void initialRocketMQConsumer(String consumerGroup) { + private void initialRocketMQConsumer(String consumerGroup, String nameServerAddress) { try { consumer = new DefaultMQPullConsumer(consumerGroup); + consumer.setNamesrvAddr(nameServerAddress); + consumer.setInstanceName( + String.join( + "||", + ManagementFactory.getRuntimeMXBean().getName(), + topic, + consumerGroup, + "" + System.nanoTime())); consumer.start(); } catch (MQClientException e) { LOG.error("Failed to initial RocketMQ consumer.", e); diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java new file mode 100644 index 0000000..d109a7f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +import java.util.HashMap; +import java.util.Map; + +/** Message contains byte array. */ +public class BytesMessage { + + private byte[] data; + private Map<String, String> properties = new HashMap<>(); + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> props) { + this.properties = props; + } + + public Object getProperty(String key) { + return properties.get(key); + } + + public void setProperty(String key, String value) { + properties.put(key, value); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java similarity index 77% copy from src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java copy to src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java index 455f8af..3b087cc 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java @@ -1,7 +1,5 @@ package org.apache.rocketmq.flink.source.reader.deserializer; -import org.apache.rocketmq.common.message.MessageExt; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -9,11 +7,9 @@ import org.apache.flink.util.Collector; import java.io.IOException; import java.io.Serializable; -import java.util.List; -/** An interface for the deserialization of RocketMQ records. */ -public interface RocketMQRecordDeserializationSchema<T> - extends Serializable, ResultTypeQueryable<T> { +/** An interface for the deserialization of records. */ +public interface DeserializationSchema<IN, OUT> extends Serializable, ResultTypeQueryable<OUT> { /** * Initialization method for the schema. It is called before the actual working methods {@link @@ -35,9 +31,9 @@ public interface RocketMQRecordDeserializationSchema<T> * records can be buffered in memory or collecting records might delay emitting checkpoint * barrier. * - * @param record The MessageExts to deserialize. + * @param record The record to deserialize. * @param out The collector to put the resulting messages. */ @PublicEvolving - void deserialize(List<MessageExt> record, Collector<T> out) throws IOException; + void deserialize(IN record, Collector<OUT> out) throws IOException; } diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java new file mode 100644 index 0000000..06a0c2d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +/** Dirty data process strategy. */ +public enum DirtyDataStrategy { + SKIP, + SKIP_SILENT, + CUT, + PAD, + NULL, + EXCEPTION +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java similarity index 84% rename from src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java rename to src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java index 455f8af..6358e4c 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java @@ -4,16 +4,14 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.util.Collector; import java.io.IOException; -import java.io.Serializable; import java.util.List; /** An interface for the deserialization of RocketMQ records. */ -public interface RocketMQRecordDeserializationSchema<T> - extends Serializable, ResultTypeQueryable<T> { +public interface RocketMQDeserializationSchema<T> + extends DeserializationSchema<List<MessageExt>, T> { /** * Initialization method for the schema. It is called before the actual working methods {@link @@ -25,7 +23,7 @@ public interface RocketMQRecordDeserializationSchema<T> * @param context Contextual information that can be used during initialization. */ @PublicEvolving - default void open(InitializationContext context) throws Exception {} + default void open(InitializationContext context) {} /** * Deserializes the byte message. diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java new file mode 100644 index 0000000..5bd990e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.flink.source.reader.deserializer.RowDeserializationSchema.MetadataConverter; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A row data wrapper class that wraps a {@link RocketMQDeserializationSchema} to deserialize {@link + * MessageExt}. + */ +public class RocketMQRowDeserializationSchema implements RocketMQDeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final RowDeserializationSchema deserializationSchema; + + private transient List<BytesMessage> bytesMessages = new ArrayList<>(1); + + public RocketMQRowDeserializationSchema( + TableSchema tableSchema, + Map<String, String> properties, + boolean hasMetadata, + MetadataConverter[] metadataConverters) { + deserializationSchema = + new RowDeserializationSchema.Builder() + .setProperties(properties) + .setTableSchema(tableSchema) + .setHasMetadata(hasMetadata) + .setMetadataConverters(metadataConverters) + .build(); + } + + @Override + public void open(InitializationContext context) { + deserializationSchema.open(context); + bytesMessages = new ArrayList<>(); + } + + @Override + public void deserialize(List<MessageExt> input, Collector<RowData> collector) { + extractMessages(input); + deserializationSchema.deserialize(bytesMessages, collector); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return deserializationSchema.getProducedType(); + } + + private void extractMessages(List<MessageExt> messages) { + bytesMessages = new ArrayList<>(messages.size()); + for (MessageExt message : messages) { + BytesMessage bytesMessage = new BytesMessage(); + bytesMessage.setData(message.getBody()); + if (message.getProperties() != null) { + bytesMessage.setProperties(message.getProperties()); + } + bytesMessage.setProperty("__topic__", message.getTopic()); + bytesMessage.setProperty( + "__store_timestamp__", String.valueOf(message.getStoreTimestamp())); + bytesMessage.setProperty( + "__born_timestamp__", String.valueOf(message.getBornTimestamp())); + bytesMessage.setProperty("__queue_id__", String.valueOf(message.getQueueId())); + bytesMessage.setProperty("__queue_offset__", String.valueOf(message.getQueueOffset())); + bytesMessage.setProperty("__msg_id__", message.getMsgId()); + bytesMessage.setProperty("__keys__", message.getKeys()); + bytesMessage.setProperty("__tags__", message.getTags()); + bytesMessages.add(bytesMessage); + } + } + + @VisibleForTesting + public List<BytesMessage> getBytesMessages() { + return bytesMessages; + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java new file mode 100644 index 0000000..f106693 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java @@ -0,0 +1,606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +import org.apache.rocketmq.flink.source.util.ByteSerializer; +import org.apache.rocketmq.flink.source.util.ByteSerializer.ValueType; +import org.apache.rocketmq.flink.source.util.StringSerializer; + +import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The row based implementation of {@link DeserializationSchema} for the deserialization of records. + */ +public class RowDeserializationSchema + implements DeserializationSchema<List<BytesMessage>, RowData> { + + private static final long serialVersionUID = -1L; + private static final Logger logger = LoggerFactory.getLogger(RowDeserializationSchema.class); + + private transient TableSchema tableSchema; + private final DirtyDataStrategy formatErrorStrategy; + private final DirtyDataStrategy fieldMissingStrategy; + private final DirtyDataStrategy fieldIncrementStrategy; + private final String encoding; + private final String fieldDelimiter; + private final String lineDelimiter; + private final boolean columnErrorDebug; + private final MetadataCollector metadataCollector; + private final int totalColumnSize; + private final int dataColumnSize; + private final ValueType[] fieldTypes; + private transient DataType[] fieldDataTypes; + private final Set<String> headerFields; + private final Map<String, String> properties; + private final Map<String, Integer> columnIndexMapping; + private final Map<Integer, Integer> dataIndexMapping; + private long lastLogExceptionTime; + private long lastLogHandleFieldTime; + + private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000; + + public RowDeserializationSchema( + TableSchema tableSchema, + DirtyDataStrategy formatErrorStrategy, + DirtyDataStrategy fieldMissingStrategy, + DirtyDataStrategy fieldIncrementStrategy, + String encoding, + String fieldDelimiter, + String lineDelimiter, + boolean columnErrorDebug, + boolean hasMetadata, + MetadataConverter[] metadataConverters, + List<String> headerFields, + Map<String, String> properties) { + this.tableSchema = tableSchema; + this.formatErrorStrategy = formatErrorStrategy; + this.fieldMissingStrategy = fieldMissingStrategy; + this.fieldIncrementStrategy = fieldIncrementStrategy; + this.columnErrorDebug = columnErrorDebug; + this.encoding = encoding; + this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter); + this.lineDelimiter = StringEscapeUtils.unescapeJava(lineDelimiter); + this.metadataCollector = new MetadataCollector(hasMetadata, metadataConverters); + this.headerFields = headerFields == null ? null : new HashSet<>(headerFields); + this.properties = properties; + this.totalColumnSize = tableSchema.getFieldNames().length; + int dataColumnSize = 0; + this.fieldTypes = new ValueType[totalColumnSize]; + this.columnIndexMapping = new HashMap<>(); + this.dataIndexMapping = new HashMap<>(); + for (int index = 0; index < tableSchema.getFieldNames().length; index++) { + this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index); + } + for (int index = 0; index < totalColumnSize; index++) { + ValueType type = + ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass()); + this.fieldTypes[index] = type; + if (!isHeaderField(index)) { + dataIndexMapping.put(dataColumnSize, index); + dataColumnSize++; + } + } + this.dataColumnSize = dataColumnSize; + } + + @Override + public void open(InitializationContext context) { + DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(properties); + this.tableSchema = SchemaValidator.deriveTableSinkSchema(descriptorProperties); + this.fieldDataTypes = tableSchema.getFieldDataTypes(); + this.lastLogExceptionTime = System.currentTimeMillis(); + this.lastLogHandleFieldTime = System.currentTimeMillis(); + } + + @Override + public void deserialize(List<BytesMessage> messages, Collector<RowData> collector) { + metadataCollector.collector = collector; + deserialize(messages, metadataCollector); + } + + private void deserialize(List<BytesMessage> messages, MetadataCollector collector) { + if (null == messages || messages.size() == 0) { + return; + } + for (BytesMessage message : messages) { + collector.message = message; + if (isOnlyHaveVarbinaryDataField()) { + GenericRowData rowData = new GenericRowData(totalColumnSize); + int dataIndex = dataIndexMapping.get(0); + rowData.setField(dataIndex, message.getData()); + for (int index = 0; index < totalColumnSize; index++) { + if (index == dataIndex) { + continue; + } + String headerValue = getHeaderValue(message, index); + rowData.setField( + index, + StringSerializer.deserialize( + headerValue, + fieldTypes[index], + fieldDataTypes[index], + new HashSet<>())); + } + collector.collect(rowData); + } else if (isAllHeaderField()) { + GenericRowData rowData = new GenericRowData(totalColumnSize); + for (int index = 0; index < totalColumnSize; index++) { + String headerValue = getHeaderValue(message, index); + rowData.setField( + index, + StringSerializer.deserialize( + headerValue, + fieldTypes[index], + fieldDataTypes[index], + new HashSet<>())); + } + collector.collect(rowData); + } else { + if (message.getData() == null) { + logger.info("Deserialize empty BytesMessage body, ignore the empty message."); + return; + } + deserializeBytesMessage(message, collector); + } + } + } + + private boolean isOnlyHaveVarbinaryDataField() { + if (dataColumnSize == 1 && dataIndexMapping.size() == 1) { + int index = dataIndexMapping.get(0); + return isByteArrayType(tableSchema.getFieldNames()[index]); + } + return false; + } + + private boolean isAllHeaderField() { + return null != headerFields && headerFields.size() == tableSchema.getFieldNames().length; + } + + private void deserializeBytesMessage(BytesMessage message, Collector<RowData> collector) { + String body; + try { + body = new String(message.getData(), encoding); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + String[] lines = StringUtils.split(body, lineDelimiter); + for (String line : lines) { + String[] data = StringUtils.splitPreserveAllTokens(line, fieldDelimiter); + if (dataColumnSize == 1) { + data = new String[1]; + data[0] = line; + } + if (data.length < dataColumnSize) { + data = handleFieldMissing(data); + } else if (data.length > dataColumnSize) { + data = handleFieldIncrement(data); + } + if (data == null) { + continue; + } + GenericRowData rowData = new GenericRowData(totalColumnSize); + boolean skip = false; + for (int index = 0; index < totalColumnSize; index++) { + try { + String fieldValue = getValue(message, data, line, index); + rowData.setField( + index, + StringSerializer.deserialize( + fieldValue, + fieldTypes[index], + fieldDataTypes[index], + new HashSet<>())); + } catch (Exception e) { + skip = handleException(rowData, index, data, e); + } + } + if (skip) { + continue; + } + collector.collect(rowData); + } + } + + private boolean isHeaderField(int index) { + return headerFields != null && headerFields.contains(tableSchema.getFieldNames()[index]); + } + + private String getHeaderValue(BytesMessage message, int index) { + Object object = message.getProperty(tableSchema.getFieldNames()[index]); + return object == null ? "" : (String) object; + } + + private String getValue(BytesMessage message, String[] data, String line, int index) { + String fieldValue = null; + if (isHeaderField(index)) { + fieldValue = getHeaderValue(message, index); + } else { + if (dataColumnSize == 1) { + fieldValue = line; + } else { + if (index < data.length) { + fieldValue = data[index]; + } + } + } + + return fieldValue; + } + + private boolean isByteArrayType(String fieldName) { + TypeInformation<?> typeInformation = + tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)]; + if (typeInformation != null) { + ValueType valueType = ByteSerializer.getTypeIndex(typeInformation.getTypeClass()); + return valueType == ValueType.V_ByteArray; + } + return false; + } + + private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) { + boolean skip = false; + switch (formatErrorStrategy) { + case SKIP: + long now = System.currentTimeMillis(); + if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) { + logger.warn( + "Data format error, field type: " + + fieldTypes[index] + + "field data: " + + data[index] + + ", index: " + + index + + ", data: [" + + StringUtils.join(data, ",") + + "]", + e); + lastLogExceptionTime = now; + } + skip = true; + break; + case SKIP_SILENT: + skip = true; + break; + default: + case CUT: + case NULL: + case PAD: + row.setField(index, null); + break; + case EXCEPTION: + throw new RuntimeException(e); + } + + return skip; + } + + private String[] handleFieldMissing(String[] data) { + switch (fieldMissingStrategy) { + default: + case SKIP: + long now = System.currentTimeMillis(); + if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) { + logger.warn( + "Field missing error, table column number: " + + totalColumnSize + + ", data column number: " + + dataColumnSize + + ", data field number: " + + data.length + + ", data: [" + + StringUtils.join(data, ",") + + "]"); + lastLogHandleFieldTime = now; + } + return null; + case SKIP_SILENT: + return null; + case CUT: + case NULL: + case PAD: + { + String[] res = new String[totalColumnSize]; + for (int i = 0; i < data.length; ++i) { + Object dataIndex = dataIndexMapping.get(i); + if (dataIndex != null) { + res[(int) dataIndex] = data[i]; + } + } + return res; + } + case EXCEPTION: + throw new RuntimeException(); + } + } + + private String[] handleFieldIncrement(String[] data) { + switch (fieldIncrementStrategy) { + case SKIP: + long now = System.currentTimeMillis(); + if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) { + logger.warn( + "Field increment error, table column number: " + + totalColumnSize + + ", data column number: " + + dataColumnSize + + ", data field number: " + + data.length + + ", data: [" + + StringUtils.join(data, ",") + + "]"); + lastLogHandleFieldTime = now; + } + return null; + case SKIP_SILENT: + return null; + default: + case CUT: + case NULL: + case PAD: + { + String[] res = new String[totalColumnSize]; + for (int i = 0; i < dataColumnSize; ++i) { + Object dataIndex = dataIndexMapping.get(i); + if (dataIndex != null) { + res[(int) dataIndex] = data[i]; + } + } + return res; + } + case EXCEPTION: + throw new RuntimeException(); + } + } + + @Override + public TypeInformation<RowData> getProducedType() { + return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType()); + } + + // -------------------------------------------------------------------------------------------- + + /** Source metadata converter interface. */ + public interface MetadataConverter extends Serializable { + Object read(BytesMessage message); + } + + // -------------------------------------------------------------------------------------------- + + /** Metadata of RowData collector. */ + public static final class MetadataCollector implements Collector<RowData>, Serializable { + + private static final long serialVersionUID = 1L; + + private final boolean hasMetadata; + private final MetadataConverter[] metadataConverters; + + public transient BytesMessage message; + public transient Collector<RowData> collector; + + public MetadataCollector(boolean hasMetadata, MetadataConverter[] metadataConverters) { + this.hasMetadata = hasMetadata; + this.metadataConverters = metadataConverters; + } + + @Override + public void collect(RowData physicalRow) { + if (hasMetadata) { + final int physicalArity = physicalRow.getArity(); + final int metadataArity = metadataConverters.length; + final GenericRowData producedRow = + new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); + final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow; + for (int index = 0; index < physicalArity; index++) { + producedRow.setField(index, genericPhysicalRow.getField(index)); + } + for (int index = 0; index < metadataArity; index++) { + producedRow.setField( + index + physicalArity, metadataConverters[index].read(message)); + } + collector.collect(producedRow); + } else { + collector.collect(physicalRow); + } + } + + @Override + public void close() { + // nothing to do + } + } + + /** Builder of {@link RowDeserializationSchema}. */ + public static class Builder { + + private TableSchema schema; + private DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP; + private DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP; + private DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT; + private String encoding = "UTF-8"; + private String lineDelimiter = "\n"; + private String fieldDelimiter = "\u0001"; + private boolean columnErrorDebug = false; + private boolean hasMetadata; + private MetadataConverter[] metadataConverters; + private List<String> headerFields; + private Map<String, String> properties; + + public Builder() {} + + public Builder setTableSchema(TableSchema tableSchema) { + this.schema = tableSchema; + return this; + } + + public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) { + this.formatErrorStrategy = formatErrorStrategy; + return this; + } + + public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) { + this.fieldMissingStrategy = fieldMissingStrategy; + return this; + } + + public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) { + this.fieldIncrementStrategy = fieldIncrementStrategy; + return this; + } + + public Builder setEncoding(String encoding) { + this.encoding = encoding; + return this; + } + + public Builder setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + return this; + } + + public Builder setLineDelimiter(String lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + + public Builder setColumnErrorDebug(boolean columnErrorDebug) { + this.columnErrorDebug = columnErrorDebug; + return this; + } + + public Builder setHasMetadata(boolean hasMetadata) { + this.hasMetadata = hasMetadata; + return this; + } + + public Builder setMetadataConverters(MetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + return this; + } + + public Builder setHeaderFields(List<String> headerFields) { + this.headerFields = headerFields; + return this; + } + + public Builder setProperties(Map<String, String> properties) { + this.properties = properties; + if (null == properties) { + return this; + } + Configuration configuration = new Configuration(); + for (String key : properties.keySet()) { + configuration.setString(key, properties.get(key)); + } + String lengthCheck = configuration.get(CollectorOption.LENGTH_CHECK); + switch (lengthCheck.toUpperCase()) { + case "SKIP": + { + this.setFormatErrorStrategy(DirtyDataStrategy.SKIP); + this.setFieldMissingStrategy(DirtyDataStrategy.SKIP); + this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP); + } + break; + case "PAD": + { + this.setFormatErrorStrategy(DirtyDataStrategy.SKIP); + this.setFieldMissingStrategy(DirtyDataStrategy.PAD); + this.setFieldIncrementStrategy(DirtyDataStrategy.CUT); + } + break; + case "EXCEPTION": + { + this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION); + this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION); + this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION); + } + break; + case "SKIP_SILENT": + { + this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT); + this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT); + this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT); + } + break; + default: + } + this.setEncoding(configuration.getString(CollectorOption.ENCODING)); + this.setFieldDelimiter(configuration.getString(CollectorOption.FIELD_DELIMITER)); + this.setLineDelimiter(configuration.getString(CollectorOption.LINE_DELIMITER)); + this.setColumnErrorDebug(configuration.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG)); + return this; + } + + public RowDeserializationSchema build() { + return new RowDeserializationSchema( + schema, + formatErrorStrategy, + fieldMissingStrategy, + fieldIncrementStrategy, + encoding, + fieldDelimiter, + lineDelimiter, + columnErrorDebug, + hasMetadata, + metadataConverters, + headerFields, + properties); + } + } + + /** Options for {@link RowDeserializationSchema}. */ + public static class CollectorOption { + public static final ConfigOption<String> ENCODING = + ConfigOptions.key("encoding".toLowerCase()).defaultValue("UTF-8"); + public static final ConfigOption<String> FIELD_DELIMITER = + ConfigOptions.key("fieldDelimiter".toLowerCase()).defaultValue("\u0001"); + public static final ConfigOption<String> LINE_DELIMITER = + ConfigOptions.key("lineDelimiter".toLowerCase()).defaultValue("\n"); + public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG = + ConfigOptions.key("columnErrorDebug".toLowerCase()).defaultValue(true); + public static final ConfigOption<String> LENGTH_CHECK = + ConfigOptions.key("lengthCheck".toLowerCase()).defaultValue("NONE"); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java new file mode 100644 index 0000000..ec41fc6 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.apache.commons.lang3.time.FastDateFormat; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.CONSUMER_GROUP; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.NAME_SERVER_ADDRESS; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_ENCODING; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_END_TIME; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE; +import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC; + +/** + * Defines the {@link DynamicTableSourceFactory} implementation to create {@link + * RocketMQScanTableSource}. + */ +public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFactory { + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + @Override + public String factoryIdentifier() { + return "rocketmq"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> requiredOptions = new HashSet<>(); + requiredOptions.add(TOPIC); + requiredOptions.add(CONSUMER_GROUP); + requiredOptions.add(NAME_SERVER_ADDRESS); + return requiredOptions; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> optionalOptions = new HashSet<>(); + optionalOptions.add(OPTIONAL_TAG); + optionalOptions.add(OPTIONAL_START_MESSAGE_OFFSET); + optionalOptions.add(OPTIONAL_START_TIME_MILLS); + optionalOptions.add(OPTIONAL_START_TIME); + optionalOptions.add(OPTIONAL_END_TIME); + optionalOptions.add(OPTIONAL_TIME_ZONE); + optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS); + optionalOptions.add(OPTIONAL_ENCODING); + optionalOptions.add(OPTIONAL_FIELD_DELIMITER); + optionalOptions.add(OPTIONAL_LINE_DELIMITER); + optionalOptions.add(OPTIONAL_COLUMN_ERROR_DEBUG); + optionalOptions.add(OPTIONAL_LENGTH_CHECK); + return optionalOptions; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + transformContext(this, context); + FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); + helper.validate(); + Map<String, String> rawProperties = context.getCatalogTable().getOptions(); + Configuration configuration = Configuration.fromMap(rawProperties); + String topic = configuration.getString(TOPIC); + String consumerGroup = configuration.getString(CONSUMER_GROUP); + String nameServerAddress = configuration.getString(NAME_SERVER_ADDRESS); + String tag = configuration.getString(OPTIONAL_TAG); + int startMessageOffset = configuration.getInteger(OPTIONAL_START_MESSAGE_OFFSET); + long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS); + String startDateTime = configuration.getString(OPTIONAL_START_TIME); + String timeZone = configuration.getString(OPTIONAL_TIME_ZONE); + long startTime = startTimeMs; + if (startTime == -1) { + if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) { + try { + startTime = parseDateString(startDateTime, timeZone); + } catch (ParseException e) { + throw new RuntimeException( + String.format( + "Incorrect datetime format: %s, pls use ISO-8601 " + + "complete date plus hours, minutes and seconds format:%s.", + startDateTime, DATE_FORMAT), + e); + } + } + } + long stopInMs = Long.MAX_VALUE; + String endDateTime = configuration.getString(OPTIONAL_END_TIME); + if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) { + try { + stopInMs = parseDateString(endDateTime, timeZone); + } catch (ParseException e) { + throw new RuntimeException( + String.format( + "Incorrect datetime format: %s, pls use ISO-8601 " + + "complete date plus hours, minutes and seconds format:%s.", + endDateTime, DATE_FORMAT), + e); + } + Preconditions.checkArgument( + stopInMs >= startTime, "Start time should be less than stop time."); + } + long partitionDiscoveryIntervalMs = + configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS); + DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(rawProperties); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + descriptorProperties.putTableSchema("schema", physicalSchema); + return new RocketMQScanTableSource( + descriptorProperties, + physicalSchema, + topic, + consumerGroup, + nameServerAddress, + tag, + stopInMs, + startMessageOffset, + startMessageOffset < 0 ? startTime : -1L, + partitionDiscoveryIntervalMs); + } + + private void transformContext( + DynamicTableFactory factory, DynamicTableFactory.Context context) { + Map<String, String> catalogOptions = context.getCatalogTable().getOptions(); + Map<String, String> convertedOptions = + normalizeOptionCaseAsFactory(factory, catalogOptions); + catalogOptions.clear(); + for (Map.Entry<String, String> entry : convertedOptions.entrySet()) { + catalogOptions.put(entry.getKey(), entry.getValue()); + } + } + + private Map<String, String> normalizeOptionCaseAsFactory( + Factory factory, Map<String, String> options) { + Map<String, String> normalizedOptions = new HashMap<>(); + Map<String, String> requiredOptionKeysLowerCaseToOriginal = + factory.requiredOptions().stream() + .collect( + Collectors.toMap( + option -> option.key().toLowerCase(), ConfigOption::key)); + Map<String, String> optionalOptionKeysLowerCaseToOriginal = + factory.optionalOptions().stream() + .collect( + Collectors.toMap( + option -> option.key().toLowerCase(), ConfigOption::key)); + for (Map.Entry<String, String> entry : options.entrySet()) { + final String catalogOptionKey = entry.getKey(); + final String catalogOptionValue = entry.getValue(); + normalizedOptions.put( + requiredOptionKeysLowerCaseToOriginal.containsKey( + catalogOptionKey.toLowerCase()) + ? requiredOptionKeysLowerCaseToOriginal.get( + catalogOptionKey.toLowerCase()) + : optionalOptionKeysLowerCaseToOriginal.getOrDefault( + catalogOptionKey.toLowerCase(), catalogOptionKey), + catalogOptionValue); + } + return normalizedOptions; + } + + private Long parseDateString(String dateString, String timeZone) throws ParseException { + FastDateFormat simpleDateFormat = + FastDateFormat.getInstance(DATE_FORMAT, TimeZone.getTimeZone(timeZone)); + return simpleDateFormat.parse(dateString).getTime(); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java new file mode 100644 index 0000000..37ab6a5 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.table; + +import org.apache.rocketmq.flink.source.RocketMQSource; +import org.apache.rocketmq.flink.source.reader.deserializer.BytesMessage; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRowDeserializationSchema; +import org.apache.rocketmq.flink.source.reader.deserializer.RowDeserializationSchema.MetadataConverter; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.types.DataType; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.api.connector.source.Boundedness.BOUNDED; +import static org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED; + +/** Defines the scan table source of RocketMQ. */ +public class RocketMQScanTableSource implements ScanTableSource, SupportsReadingMetadata { + + private final DescriptorProperties properties; + private final TableSchema schema; + + private final String topic; + private final String consumerGroup; + private final String nameServerAddress; + private final String tag; + + private final long stopInMs; + private final long partitionDiscoveryIntervalMs; + private final long startMessageOffset; + private final long startTime; + + private List<String> metadataKeys; + + public RocketMQScanTableSource( + DescriptorProperties properties, + TableSchema schema, + String topic, + String consumerGroup, + String nameServerAddress, + String tag, + long stopInMs, + long startMessageOffset, + long startTime, + long partitionDiscoveryIntervalMs) { + this.properties = properties; + this.schema = schema; + this.topic = topic; + this.consumerGroup = consumerGroup; + this.nameServerAddress = nameServerAddress; + this.tag = tag; + this.stopInMs = stopInMs; + this.startMessageOffset = startMessageOffset; + this.startTime = startTime; + this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return SourceProvider.of( + new RocketMQSource<>( + topic, + consumerGroup, + nameServerAddress, + tag, + stopInMs, + startTime, + startMessageOffset < 0 ? 0 : startMessageOffset, + partitionDiscoveryIntervalMs, + isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED, + createDeserializationSchema())); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + } + + @Override + public DynamicTableSource copy() { + RocketMQScanTableSource tableSource = + new RocketMQScanTableSource( + properties, + schema, + topic, + consumerGroup, + nameServerAddress, + tag, + stopInMs, + startMessageOffset, + startTime, + partitionDiscoveryIntervalMs); + tableSource.metadataKeys = metadataKeys; + return tableSource; + } + + @Override + public String asSummaryString() { + return "RocketMQScanTableSource"; + } + + private RocketMQDeserializationSchema<RowData> createDeserializationSchema() { + final MetadataConverter[] metadataConverters = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(m -> m.converter) + .toArray(MetadataConverter[]::new); + return new RocketMQRowDeserializationSchema( + schema, properties.asMap(), metadataKeys.size() > 0, metadataConverters); + } + + private boolean isBounded() { + return stopInMs != Long.MAX_VALUE; + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + + enum ReadableMetadata { + TOPIC( + "topic", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(BytesMessage message) { + return StringData.fromString( + String.valueOf(message.getProperty("__topic__"))); + } + }); + + final String key; + + final DataType dataType; + + final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java new file mode 100644 index 0000000..358cb84 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.util; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.Period; + +/** BytesSerializer is responsible to deserialize field from byte array. */ +public class ByteSerializer { + + public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + + public static Object deserialize(byte[] value, ValueType type) { + return deserialize(value, type, DEFAULT_CHARSET); + } + + public static Object deserialize(byte[] value, ValueType type, Charset charset) { + switch (type) { + case V_String: + return null == value ? "" : new String(value, charset); + case V_Timestamp: // sql.Timestamp encoded as long + return new Timestamp(ByteUtils.toLong(value)); + case V_Date: // sql.Date encoded as long + return new Date(ByteUtils.toLong(value)); + case V_Time: // sql.Time encoded as long + return new Time(ByteUtils.toLong(value)); + case V_BigDecimal: + return ByteUtils.toBigDecimal(value); + default: + return commonDeserialize(value, type); + } + } + + private static Object commonDeserialize(byte[] value, ValueType type) { + switch (type) { + case V_ByteArray: // byte[] + return value; + case V_Byte: // byte + return null == value ? (byte) '\0' : value[0]; + case V_Short: + return ByteUtils.toShort(value); + case V_Integer: + return ByteUtils.toInt(value); + case V_Long: + return ByteUtils.toLong(value); + case V_Float: + return ByteUtils.toFloat(value); + case V_Double: + return ByteUtils.toDouble(value); + case V_Boolean: + return ByteUtils.toBoolean(value); + case V_BigInteger: + return new BigInteger(value); + default: + throw new IllegalArgumentException(); + } + } + + public static ValueType getTypeIndex(Class<?> clazz) { + if (byte[].class.equals(clazz)) { + return ValueType.V_ByteArray; + } else if (String.class.equals(clazz)) { + return ValueType.V_String; + } else if (Byte.class.equals(clazz)) { + return ValueType.V_Byte; + } else if (Short.class.equals(clazz)) { + return ValueType.V_Short; + } else if (Integer.class.equals(clazz)) { + return ValueType.V_Integer; + } else if (Long.class.equals(clazz)) { + return ValueType.V_Long; + } else if (Float.class.equals(clazz)) { + return ValueType.V_Float; + } else if (Double.class.equals(clazz)) { + return ValueType.V_Double; + } else if (Boolean.class.equals(clazz)) { + return ValueType.V_Boolean; + } else if (Timestamp.class.equals(clazz)) { + return ValueType.V_Timestamp; + } else if (Date.class.equals(clazz)) { + return ValueType.V_Date; + } else if (Time.class.equals(clazz)) { + return ValueType.V_Time; + } else if (BigDecimal.class.equals(clazz)) { + return ValueType.V_BigDecimal; + } else if (BigInteger.class.equals(clazz)) { + return ValueType.V_BigInteger; + } else if (LocalDateTime.class.equals(clazz)) { + return ValueType.V_LocalDateTime; + } else if (LocalDate.class.equals(clazz)) { + return ValueType.V_LocalDate; + } else if (Duration.class.equals(clazz)) { + return ValueType.V_Duration; + } else if (LocalTime.class.equals(clazz)) { + return ValueType.V_LocalTime; + } else if (Period.class.equals(clazz)) { + return ValueType.V_Period; + } else if (OffsetDateTime.class.equals(clazz)) { + return ValueType.V_OffsetDateTime; + } else { + return ValueType.Unsupported; + } + } + + /** Value Type. */ + public enum ValueType { + V_ByteArray, + V_String, + V_Byte, + V_Short, + V_Integer, + V_Long, + V_Float, + V_Double, + V_Boolean, + V_Timestamp, + V_Date, + V_Time, + V_BigDecimal, + V_BigInteger, + V_LocalDateTime, + V_LocalDate, + V_Duration, + V_LocalTime, + V_Period, + V_OffsetDateTime, + Unsupported + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java b/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java new file mode 100644 index 0000000..6e223a3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.util; + +import java.math.BigDecimal; +import java.math.BigInteger; + +/** Utility class to for operations related to bytes. */ +public class ByteUtils { + + /** + * Converts a byte array to an int value. + * + * @param bytes byte array + * @return the int value + */ + public static int toInt(byte[] bytes) { + return toInt(bytes, 0); + } + + /** + * Converts a byte array to an int value. + * + * @param bytes byte array + * @param offset offset into array + * @return the int value + * @throws IllegalArgumentException if there's not enough room in the array at the offset + * indicated. + */ + public static int toInt(byte[] bytes, int offset) { + if (offset + Integer.BYTES > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, Integer.BYTES, Integer.BYTES); + } + int n = 0; + for (int i = offset; i < (offset + Integer.BYTES); i++) { + n <<= 8; + n ^= bytes[i] & 0xFF; + } + return n; + } + + /** + * Convert a byte array to a boolean. + * + * @param b array + * @return True or false. + */ + public static boolean toBoolean(final byte[] b) { + return toBoolean(b, 0); + } + + /** + * Convert a byte array to a boolean. + * + * @param b array + * @param offset offset into array + * @return True or false. + */ + public static boolean toBoolean(final byte[] b, final int offset) { + if (offset + 1 > b.length) { + throw explainWrongLengthOrOffset(b, offset, 1, 1); + } + return b[offset] != (byte) 0; + } + + /** + * Converts a byte array to a long value. + * + * @param bytes array + * @return the long value + */ + public static long toLong(byte[] bytes) { + return toLong(bytes, 0); + } + + /** + * Converts a byte array to a long value. + * + * @param bytes array of bytes + * @param offset offset into array + * @return the long value + * @throws IllegalArgumentException if there's not enough room in the array at the offset + * indicated. + */ + public static long toLong(byte[] bytes, int offset) { + if (offset + Long.BYTES > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, Long.BYTES, Long.BYTES); + } + long l = 0; + for (int i = offset; i < offset + Long.BYTES; i++) { + l <<= 8; + l ^= bytes[i] & 0xFF; + } + return l; + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format". + * + * @param bytes byte array + * @return Float made from passed byte array. + */ + public static float toFloat(byte[] bytes) { + return toFloat(bytes, 0); + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format". + * + * @param bytes array to convert + * @param offset offset into array + * @return Float made from passed byte array. + */ + public static float toFloat(byte[] bytes, int offset) { + return Float.intBitsToFloat(toInt(bytes, offset)); + } + + /** + * Parse a byte array to double. + * + * @param bytes byte array + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte[] bytes) { + return toDouble(bytes, 0); + } + + /** + * Parse a byte array to double. + * + * @param bytes byte array + * @param offset offset where double is + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte[] bytes, final int offset) { + return Double.longBitsToDouble(toLong(bytes, offset)); + } + + /** + * Converts a byte array to a short value. + * + * @param bytes byte array + * @return the short value + */ + public static short toShort(byte[] bytes) { + return toShort(bytes, 0); + } + + /** + * Converts a byte array to a short value. + * + * @param bytes byte array + * @param offset offset into array + * @return the short value + * @throws IllegalArgumentException if there's not enough room in the array at the offset + * indicated. + */ + public static short toShort(byte[] bytes, int offset) { + if (offset + Short.BYTES > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, Short.BYTES, Short.BYTES); + } + short n = 0; + n ^= bytes[offset] & 0xFF; + n <<= 8; + n ^= bytes[offset + 1] & 0xFF; + return n; + } + + // --------------------------------------------------------------------------------------------------------- + + private static IllegalArgumentException explainWrongLengthOrOffset( + final byte[] bytes, final int offset, final int length, final int expectedLength) { + String exceptionMessage; + if (length != expectedLength) { + exceptionMessage = "Wrong length: " + length + ", expected " + expectedLength; + } else { + exceptionMessage = + "offset (" + + offset + + ") + length (" + + length + + ") exceed the" + + " capacity of the array: " + + bytes.length; + } + return new IllegalArgumentException(exceptionMessage); + } + + public static BigDecimal toBigDecimal(byte[] bytes) { + return toBigDecimal(bytes, 0, bytes.length); + } + + public static BigDecimal toBigDecimal(byte[] bytes, int offset, int length) { + if (bytes != null && length >= 5 && offset + length <= bytes.length) { + int scale = toInt(bytes, offset); + byte[] tcBytes = new byte[length - 4]; + System.arraycopy(bytes, offset + 4, tcBytes, 0, length - 4); + return new BigDecimal(new BigInteger(tcBytes), scale); + } else { + return null; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java new file mode 100644 index 0000000..b468ac9 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.util; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.data.util.DataFormatConverters.TimestampConverter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; + +import sun.misc.BASE64Decoder; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Set; + +/** String serializer. */ +public class StringSerializer { + + public static TimestampConverter timestampConverter = new TimestampConverter(3); + private static final BASE64Decoder decoder = new BASE64Decoder(); + + public static Object deserialize( + String value, + ByteSerializer.ValueType type, + DataType dataType, + Set<String> nullValues) { + return deserialize(value, type, dataType, nullValues, false); + } + + public static Object deserialize( + String value, + ByteSerializer.ValueType type, + DataType dataType, + Set<String> nullValues, + Boolean isRGData) { + if (null != nullValues && nullValues.contains(value)) { + return null; + } + switch (type) { + case V_ByteArray: // byte[] + if (isRGData) { + byte[] bytes = null; + try { + bytes = decoder.decodeBuffer(value); + } catch (Exception e) { + // + } + return bytes; + } else { + return value.getBytes(); + } + case V_String: + return BinaryStringData.fromString(value); + case V_Byte: // byte + return null == value ? null : Byte.parseByte(value); + case V_Short: + return null == value ? null : Short.parseShort(value); + case V_Integer: + return null == value ? null : Integer.parseInt(value); + case V_Long: + return null == value ? null : Long.parseLong(value); + case V_Float: + return null == value ? null : Float.parseFloat(value); + case V_Double: + return null == value ? null : Double.parseDouble(value); + case V_Boolean: + return null == value ? null : parseBoolean(value); + case V_Timestamp: // sql.Timestamp encoded as long + if (isRGData) { + return null == value ? null : Long.parseLong(value); + } + if (null == value) { + return null; + } else { + try { + return timestampConverter.toInternal(new Timestamp(Long.parseLong(value))); + } catch (NumberFormatException e) { + return timestampConverter.toInternal(Timestamp.valueOf(value)); + } + } + case V_Date: // sql.Date encoded as long + if (isRGData) { + return null == value ? null : Long.parseLong(value); + } + return null == value + ? null + : DataFormatConverters.DateConverter.INSTANCE.toInternal( + Date.valueOf(value)); + case V_Time: // sql.Time encoded as long + if (isRGData) { + return null == value ? null : Long.parseLong(value); + } + return null == value + ? null + : DataFormatConverters.TimeConverter.INSTANCE.toInternal( + new Time(Long.parseLong(value))); + case V_BigDecimal: + DecimalType decimalType = (DecimalType) dataType.getLogicalType(); + return value == null + ? null + : DecimalData.fromBigDecimal( + new BigDecimal(value), + decimalType.getPrecision(), + decimalType.getScale()); + case V_BigInteger: + return null == value ? null : new BigInteger(value); + + default: + throw new IllegalArgumentException(); + } + } + + public static Object deserialize( + String value, ByteSerializer.ValueType type, DataType dataType, Boolean isRGData) { + return deserialize(value, type, dataType, null, isRGData); + } + + public static Boolean parseBoolean(String s) { + if (s != null) { + if (s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false")) { + return Boolean.valueOf(s); + } + + if (s.equals("1")) { + return Boolean.TRUE; + } + + if (s.equals("0")) { + return Boolean.FALSE; + } + } + + throw new IllegalArgumentException(); + } +} diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..32de8b2 --- /dev/null +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java new file mode 100644 index 0000000..a904b04 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +import org.apache.rocketmq.common.message.MessageExt; + +import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +import org.junit.Test; +import org.powermock.reflect.Whitebox; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +/** Test for {@link RocketMQRowDeserializationSchema}. */ +public class RocketMQRowDeserializationSchemaTest { + + @Test + public void testDeserialize() { + TableSchema tableSchema = + new TableSchema.Builder() + .field("int", DataTypes.INT()) + .field("varchar", DataTypes.VARCHAR(100)) + .field("bool", DataTypes.BOOLEAN()) + .field("char", DataTypes.CHAR(5)) + .field("tinyint", DataTypes.TINYINT()) + .field("decimal", DataTypes.DECIMAL(10, 5)) + .field("smallint", DataTypes.SMALLINT()) + .field("bigint", DataTypes.BIGINT()) + .field("float", DataTypes.FLOAT()) + .field("double", DataTypes.DOUBLE()) + .field("date", DataTypes.DATE()) + .field("time", DataTypes.TIME()) + .field("timestamp", DataTypes.TIMESTAMP()) + .build(); + RocketMQRowDeserializationSchema recordDeserializer = + new RocketMQRowDeserializationSchema(tableSchema, new HashMap<>(), false, null); + RowDeserializationSchema sourceDeserializer = mock(RowDeserializationSchema.class); + InitializationContext initializationContext = mock(InitializationContext.class); + doNothing().when(sourceDeserializer).open(initializationContext); + Whitebox.setInternalState(recordDeserializer, "deserializationSchema", sourceDeserializer); + recordDeserializer.open(initializationContext); + MessageExt firstMsg = + new MessageExt( + 1, + System.currentTimeMillis(), + InetSocketAddress.createUnresolved("localhost", 8080), + System.currentTimeMillis(), + InetSocketAddress.createUnresolved("localhost", 8088), + "184019387"); + firstMsg.setBody("test_deserializer_raw_messages_1".getBytes()); + MessageExt secondMsg = + new MessageExt( + 1, + System.currentTimeMillis(), + InetSocketAddress.createUnresolved("localhost", 8081), + System.currentTimeMillis(), + InetSocketAddress.createUnresolved("localhost", 8087), + "284019387"); + secondMsg.setBody("test_deserializer_raw_messages_2".getBytes()); + MessageExt thirdMsg = + new MessageExt( + 1, + System.currentTimeMillis(), + InetSocketAddress.createUnresolved("localhost", 8082), + System.currentTimeMillis(), + InetSocketAddress.createUnresolved("localhost", 8086), + "384019387"); + thirdMsg.setBody("test_deserializer_raw_messages_3".getBytes()); + List<MessageExt> messages = Arrays.asList(firstMsg, secondMsg, thirdMsg); + Collector<RowData> collector = mock(Collector.class); + recordDeserializer.deserialize(messages, collector); + + assertEquals(3, recordDeserializer.getBytesMessages().size()); + assertEquals(firstMsg.getBody(), recordDeserializer.getBytesMessages().get(0).getData()); + assertEquals( + String.valueOf(firstMsg.getStoreTimestamp()), + recordDeserializer.getBytesMessages().get(0).getProperty("__store_timestamp__")); + assertEquals( + String.valueOf(firstMsg.getBornTimestamp()), + recordDeserializer.getBytesMessages().get(0).getProperty("__born_timestamp__")); + assertEquals( + String.valueOf(firstMsg.getQueueId()), + recordDeserializer.getBytesMessages().get(0).getProperty("__queue_id__")); + assertEquals( + String.valueOf(firstMsg.getQueueOffset()), + recordDeserializer.getBytesMessages().get(0).getProperty("__queue_offset__")); + assertEquals(secondMsg.getBody(), recordDeserializer.getBytesMessages().get(1).getData()); + assertEquals( + String.valueOf(secondMsg.getStoreTimestamp()), + recordDeserializer.getBytesMessages().get(1).getProperty("__store_timestamp__")); + assertEquals( + String.valueOf(secondMsg.getBornTimestamp()), + recordDeserializer.getBytesMessages().get(1).getProperty("__born_timestamp__")); + assertEquals( + String.valueOf(secondMsg.getQueueId()), + recordDeserializer.getBytesMessages().get(1).getProperty("__queue_id__")); + assertEquals( + String.valueOf(secondMsg.getQueueOffset()), + recordDeserializer.getBytesMessages().get(1).getProperty("__queue_offset__")); + assertEquals(thirdMsg.getBody(), recordDeserializer.getBytesMessages().get(2).getData()); + assertEquals( + String.valueOf(thirdMsg.getStoreTimestamp()), + recordDeserializer.getBytesMessages().get(2).getProperty("__store_timestamp__")); + assertEquals( + String.valueOf(thirdMsg.getBornTimestamp()), + recordDeserializer.getBytesMessages().get(2).getProperty("__born_timestamp__")); + assertEquals( + String.valueOf(thirdMsg.getQueueId()), + recordDeserializer.getBytesMessages().get(2).getProperty("__queue_id__")); + assertEquals( + String.valueOf(thirdMsg.getQueueOffset()), + recordDeserializer.getBytesMessages().get(2).getProperty("__queue_offset__")); + } +}
