This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 44bb9bd16d11575f881ade0793d858c12ce0911f Author: lizhiboo <[email protected]> AuthorDate: Thu Sep 9 12:52:18 2021 +0800 [ISSUE #801]Rocketmq connector sink for hudi (#800) * support rocketmq sink to hudi * support rocketmq sink to hudi debug * remove unused code * support task divide * support divide strategy by topic queue * support divide strategy by topic queue * add log4j.properties * upgrade javalin to 2.8.0 * add log * add log * add log * add quick stat in READMQ.md * support start hudi sink by spark-submit * code style * code style * code style * code style * code style * code style * code style --- README.md | 78 ++++++ pom.xml | 287 +++++++++++++++++++++ .../rocketmq/connect/hudi/config/CloneUtils.java | 50 ++++ .../rocketmq/connect/hudi/config/ConfigUtil.java | 70 +++++ .../connect/hudi/config/HudiConnectConfig.java | 173 +++++++++++++ .../connect/hudi/config/SinkConnectConfig.java | 139 ++++++++++ .../apache/rocketmq/connect/hudi/config/Utils.java | 75 ++++++ .../connect/hudi/connector/HudiSinkConnector.java | 250 ++++++++++++++++++ .../connect/hudi/connector/HudiSinkTask.java | 111 ++++++++ .../apache/rocketmq/connect/hudi/sink/Updater.java | 239 +++++++++++++++++ .../connect/hudi/strategy/ITaskDivideStrategy.java | 27 ++ .../hudi/strategy/TaskDivideByQueueStrategy.java | 80 ++++++ .../hudi/strategy/TaskDivideStrategyFactory.java | 25 ++ style/rmq_checkstyle.xml | 135 ++++++++++ 14 files changed, 1739 insertions(+) diff --git a/README.md b/README.md new file mode 100644 index 0000000..509f3b2 --- /dev/null +++ b/README.md @@ -0,0 +1,78 @@ +# rocketmq-connect-hudi + +## rocketmq-connect-hudi 打包 +``` +mvn clean install -DskipTest -U +``` +将target目录下打包的rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到connector-runtime connect.conf配置的connector-plugin目录下。 +## 目前安装会遇到的问题 + +目前的rocketmq-connect-hudi 使用的是0.8.0版本的hudi. + +## rocketmq-connect-hudi 启动 + +首先,需要启动connect-runtime,参考rocketmq-connect-runtime的run_work.sh脚本。 +* **hudi-sink-connector** 启动 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name} +?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/o [...] +``` +启动成功会打印如下日志: +``` +2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfully +``` +>**注:** `rocketmq-hudi-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 + +## rocketmq-connect-hudi 停止 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/stop +``` + +## rocketmq-connect-hudi 参数说明 +* **hudi-sink-connector 参数说明** + +参数 | 类型 | 是否必须 | 描述 | 样例 +|---|---|---|---|---| +|connector-class | String | 是 | sink connector类 | HudiSinkConnector| +|tablePath | String | 是 | sink到hudi的表路径 | file:///tmp/hudi_connector_test | +|tableName | String | 是 | sink到hudi的表名称| hudi_connector_test_table | +|insertShuffleParallelism | int | 是 | hudi insert并发度 | 2 | +|upsertShuffleParallelism | int | 是 | hudi upsert并发度 | 2 | +|deleteParallelism | int | 是 | hudi delete并发度 | 2 | +|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_hudi | +|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 | +|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 | +|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 | +|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 | +|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.RocketMQConverter | +|src-cluster | String | 否 | 源集群 | DefaultCluster | +|refresh-interval | String | 否 | sink的刷新时间,单位ms | 10000 | +|schemaPath | String | 是 | sink的schema地址 | /Users/osgoo/Downloads/user.avsc" | + + +示例配置如下 +```js +{ + "connector-class": "org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector", + "topicNames": "topicc", + "tablePath": "file:///tmp/hudi_connector_test", + "tableName": "hudi_connector_test_table", + "insertShuffleParallelism": "2", + "upsertShuffleParallelism": "2", + "deleteParallelism": "2", + "source-record-converter": "org.apache.rocketmq.connect.runtime.converter.RocketMQConverter", + "source-rocketmq": "127.0.0.1:9876", + "src-cluster": "DefaultCluster", + "refresh-interval": "10000", + "schemaPath": "/Users/osgoo/Downloads/user.avsc" +} +``` + +* **spark-submit 启动任务** +将connect-runtime打包后通过spark-submit提交任务 +``` +nohup sh spark-submit --class org.apache.rocketmq.connect.runtime.ConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apa [...] +``` +后续操作参考rocketmq-connect-hudi启动步骤 \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..97c8785 --- /dev/null +++ b/pom.xml @@ -0,0 +1,287 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-hudi</artifactId> + <version>0.0.1-SNAPSHOT</version> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <rocketmq.version>4.5.2</rocketmq.version> + + <hudi.version>0.8.0</hudi.version> + <avro.version>1.10.2</avro.version> + <parquet.version>1.10.1</parquet.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <excludeTransitive>false</excludeTransitive> + <stripVersion>true</stripVersion> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>0.12</version> + <configuration> + <excludes> + <exclude>README.md</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <archive> + <!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded--> + <manifest> + <mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <executions> + <execution> + <id>verify</id> + <phase>verify</phase> + <configuration> + <configLocation>style/rmq_checkstyle.xml</configLocation> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestResources>false</includeTestResources> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>2.6.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.6.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.12</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + <version>0.3.1-alpha</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-remoting</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-java-client</artifactId> + <version>${hudi.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.51</version> + </dependency> + + <!-- used for spark-submit --> + <dependency> + <groupId>org.pentaho</groupId> + <artifactId>pentaho-aggdesigner-algorithm</artifactId> + <version>5.1.5-jhyde</version> + </dependency> + <dependency> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + <version>3.2</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>2.3.7</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>failureaccess</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>3.3.1</version> + </dependency> + + </dependencies> +</project> \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java new file mode 100644 index 0000000..dc3605d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.hudi.config; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.ObjectInputStream; + +public class CloneUtils { + private static final Logger log = LoggerFactory.getLogger(CloneUtils.class); + + @SuppressWarnings("unchecked") + public static <T extends Serializable> T clone(T obj) { + T clonedObj = null; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(obj); + oos.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ObjectInputStream ois = new ObjectInputStream(bais); + clonedObj = (T) ois.readObject(); + ois.close(); + } catch (Exception e) { + log.error("Clone occur exception", e); + } + return clonedObj; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java new file mode 100644 index 0000000..88f8a8e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.config; + +import io.openmessaging.KeyValue; + +import java.lang.reflect.Method; + +public class ConfigUtil { + public static <T> void load(KeyValue props, Object object) { + + properties2Object(props, object); + } + + private static <T> void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java new file mode 100644 index 0000000..4c04605 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.config; + + +import org.apache.avro.Schema; + +import java.util.HashSet; +import java.util.Set; + + +public class HudiConnectConfig { + + protected String tableType = "COPY_ON_WRITE"; + + protected String tablePath; + + protected String tableName; + + protected int insertShuffleParallelism = 2; + + protected int upsertShuffleParallelism = 2; + + protected int deleteParallelism = 2; + + protected String srcRecordConverter; + + protected String topicNames; + + protected String indexType = "INMEMORY"; + + protected String schemaPath; + + public Schema schema; + + public static final String CONN_TASK_PARALLELISM = "task-parallelism"; + public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy"; + public static final String CONN_WHITE_LIST = "whiteDataBase"; + public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter"; + + public static final String CONN_HUDI_TABLE_TYPE = "tableType"; + public static final String CONN_HUDI_TABLE_PATH = "tablePath"; + public static final String CONN_HUDI_TABLE_NAME = "tableName"; + public static final String CONN_HUDI_INSERT_SHUFFLE_PARALLELISM = "insertShuffleParallelism"; + public static final String CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM = "upsertShuffleParallelism"; + public static final String CONN_HUDI_DELETE_PARALLELISM = "deleteParallelism"; + + public static final String CONN_TOPIC_NAMES = "topicNames"; + public static final String CONN_TOPIC_QUEUES = "topicQueues"; + public static final String CONN_SCHEMA_PATH = "schemaPath"; + + public static final String CONN_TOPIC_ROUTE_INFO = "topicRouterInfo"; + + public static final String CONN_SOURCE_RMQ = "source-rocketmq"; + public static final String CONN_SOURCE_CLUSTER = "source-cluster"; + public static final String REFRESH_INTERVAL = "refresh.interval"; + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add(CONN_HUDI_TABLE_PATH); + add(CONN_HUDI_TABLE_NAME); + add(CONN_HUDI_INSERT_SHUFFLE_PARALLELISM); + add(CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM); + add(CONN_HUDI_DELETE_PARALLELISM); + add(CONN_SOURCE_RECORD_CONVERTER); + add(CONN_TOPIC_NAMES); + add(CONN_SCHEMA_PATH); + } + }; + + public String getTableType() { + return tableType; + } + + public void setTableType(String tableType) { + this.tableType = tableType; + } + + public String getTablePath() { + return tablePath; + } + + public void setTablePath(String tablePath) { + this.tablePath = tablePath; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public int getInsertShuffleParallelism() { + return insertShuffleParallelism; + } + + public void setInsertShuffleParallelism(int insertShuffleParallelism) { + this.insertShuffleParallelism = insertShuffleParallelism; + } + + public int getUpsertShuffleParallelism() { + return upsertShuffleParallelism; + } + + public void setUpsertShuffleParallelism(int upsertShuffleParallelism) { + this.upsertShuffleParallelism = upsertShuffleParallelism; + } + + public int getDeleteParallelism() { + return deleteParallelism; + } + + public void setDeleteParallelism(int deleteParallelism) { + this.deleteParallelism = deleteParallelism; + } + + public String getSrcRecordConverter() { + return srcRecordConverter; + } + + public void setSrcRecordConverter(String srcRecordConverter) { + this.srcRecordConverter = srcRecordConverter; + } + + public String getTopicNames() { + return topicNames; + } + + public void setTopicNames(String topicNames) { + this.topicNames = topicNames; + } + + public String getIndexType() { + return indexType; + } + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + public String getSchemaPath() { + return schemaPath; + } + + public void setSchemaPath(String schemaPath) { + this.schemaPath = schemaPath; + } + + public Schema getSchema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java new file mode 100644 index 0000000..943df40 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.config; + +import io.openmessaging.KeyValue; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class SinkConnectConfig extends HudiConnectConfig { + private Set<String> whiteList; + private String srcNamesrvs; + private String srcCluster; + private long refreshInterval; + private Map<String, Set<MessageQueue>> topicRouteMap; + public int taskParallelism; + private String taskDivideStrategy; + + public SinkConnectConfig(){ + } + + public void validate(KeyValue config) { + buildWhiteList(config); + this.tablePath = config.getString(HudiConnectConfig.CONN_HUDI_TABLE_PATH); + this.tableName = config.getString(HudiConnectConfig.CONN_HUDI_TABLE_NAME); + this.insertShuffleParallelism = config.getInt(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM); + this.deleteParallelism = config.getInt(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM); + this.upsertShuffleParallelism = config.getInt(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM); + this.setSrcRecordConverter(config.getString(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER)); + this.setTopicNames(config.getString(HudiConnectConfig.CONN_TOPIC_NAMES)); + this.setSchemaPath(config.getString(HudiConnectConfig.CONN_SCHEMA_PATH)); + + this.srcNamesrvs = config.getString(HudiConnectConfig.CONN_SOURCE_RMQ); + this.srcCluster = config.getString(HudiConnectConfig.CONN_SOURCE_CLUSTER); + this.refreshInterval = config.getLong(HudiConnectConfig.REFRESH_INTERVAL, 3); + + } + + private void buildWhiteList(KeyValue config) { + this.whiteList = new HashSet<>(); + String whiteListStr = config.getString(HudiConnectConfig.CONN_TOPIC_NAMES, ""); + String[] wl = whiteListStr.trim().split(","); + if (wl.length <= 0) + throw new IllegalArgumentException("White list must be not empty."); + else { + this.whiteList.clear(); + for (String t : wl) { + this.whiteList.add(t.trim()); + } + } + } + + + public Set<String> getWhiteList() { + return whiteList; + } + + public void setWhiteList(Set<String> whiteList) { + this.whiteList = whiteList; + } + + public String getSrcNamesrvs() { + return this.srcNamesrvs; + } + + public String getSrcCluster() { + return this.srcCluster; + } + + public long getRefreshInterval() { + return this.refreshInterval; + } + + public Map<String, Set<MessageQueue>> getTopicRouteMap() { + return topicRouteMap; + } + + public void setTopicRouteMap(Map<String, Set<MessageQueue>> topicRouteMap) { + this.topicRouteMap = topicRouteMap; + } + + public Set<String> getWhiteTopics() { + return getWhiteList(); + } + + public int getTaskParallelism() { + return taskParallelism; + } + + public void setTaskParallelism(int taskParallelism) { + this.taskParallelism = taskParallelism; + } + + public String getTaskDivideStrategy() { + return taskDivideStrategy; + } + + public void setTaskDivideStrategy(String taskDivideStrategy) { + this.taskDivideStrategy = taskDivideStrategy; + } + + @Override + public String toString() { + return "SinkConnectConfig{" + + "whiteList=" + whiteList + + ", srcNamesrvs='" + srcNamesrvs + '\'' + + ", srcCluster='" + srcCluster + '\'' + + ", refreshInterval=" + refreshInterval + + ", topicRouteMap=" + topicRouteMap + + ", tableType='" + tableType + '\'' + + ", tablePath='" + tablePath + '\'' + + ", tableName='" + tableName + '\'' + + ", insertShuffleParallelism=" + insertShuffleParallelism + + ", upsertShuffleParallelism=" + upsertShuffleParallelism + + ", deleteParallelism=" + deleteParallelism + + ", indexType='" + indexType + '\'' + + ", schemaPath='" + schemaPath + '\'' + + ", schema=" + schema + + '}'; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java new file mode 100644 index 0000000..d9bc6fe --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.config; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class Utils { + private static final Logger log = LoggerFactory.getLogger(Utils.class); + + public static String createGroupName(String prefix) { + return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString(); + } + + public static String createGroupName(String prefix, String postfix) { + return new StringBuilder().append(prefix).append("-").append(postfix).toString(); + } + + public static String createTaskId(String prefix) { + return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString(); + } + + public static String createInstanceName(String namesrvAddr) { + String[] namesrvArray = namesrvAddr.split(";"); + List<String> namesrvList = new ArrayList<>(); + for (String ns : namesrvArray) { + if (!namesrvList.contains(ns)) { + namesrvList.add(ns); + } + } + Collections.sort(namesrvList); + return String.valueOf(namesrvList.toString().hashCode()); + } + + public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic, + String cluster) throws RemotingException, MQClientException, InterruptedException { + List<BrokerData> brokerList = new ArrayList<>(); + + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + if (topicRouteData.getBrokerDatas() != null) { + for (BrokerData broker : topicRouteData.getBrokerDatas()) { + if (StringUtils.equals(broker.getCluster(), cluster)) { + brokerList.add(broker); + } + } + } + return brokerList; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java new file mode 100644 index 0000000..a496418 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.connector; + + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.sink.SinkConnector; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig; +import org.apache.rocketmq.connect.hudi.config.SinkConnectConfig; +import org.apache.rocketmq.connect.hudi.config.CloneUtils; +import org.apache.rocketmq.connect.hudi.config.Utils; +import org.apache.rocketmq.connect.hudi.strategy.ITaskDivideStrategy; +import org.apache.rocketmq.connect.hudi.strategy.TaskDivideStrategyFactory; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + + +public class HudiSinkConnector extends SinkConnector { + private static final Logger log = LoggerFactory.getLogger(HudiSinkConnector.class); + private volatile boolean configValid = false; + private ScheduledExecutorService executor; + private HashMap<String, Set<MessageQueue>> topicRouteMap; + + private DefaultMQAdminExt srcMQAdminExt; + private SinkConnectConfig sinkConnectConfig; + + private volatile boolean adminStarted; + + private ScheduledFuture<?> listenerHandle; + public static final String HUDI_CONNECTOR_ADMIN_PREFIX = "HUDI-CONNECTOR-ADMIN"; + public static final String PREFIX = "hudi"; + + public HudiSinkConnector() { + topicRouteMap = new HashMap<>(); + sinkConnectConfig = new SinkConnectConfig(); + executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("HudiFSinkConnector-SinkWatcher-%d").daemon(true).build()); + } + + private synchronized void startMQAdminTools() { + if (!configValid || adminStarted) { + return; + } + RPCHook rpcHook = null; + this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook); + this.srcMQAdminExt.setNamesrvAddr(this.sinkConnectConfig.getSrcNamesrvs()); + this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(HUDI_CONNECTOR_ADMIN_PREFIX)); + this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.sinkConnectConfig.getSrcNamesrvs())); + + try { + log.info("Trying to start srcMQAdminExt"); + this.srcMQAdminExt.start(); + log.info("RocketMQ srcMQAdminExt started"); + + } catch (MQClientException e) { + log.error("Hudi Sink Task start failed for `srcMQAdminExt` exception.", e); + } + + adminStarted = true; + } + + @Override + public String verifyAndSetConfig(KeyValue config) { + for (String requestKey : HudiConnectConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + return "Request config key: " + requestKey; + } + } + try { + this.sinkConnectConfig.validate(config); + } catch (IllegalArgumentException e) { + return e.getMessage(); + } + this.configValid = true; + + return ""; + } + + @Override + public void start() { + startMQAdminTools(); + startListener(); + } + + public void startListener() { + listenerHandle = executor.scheduleAtFixedRate(new Runnable() { + boolean first = true; + HashMap<String, Set<MessageQueue>> origin = null; + + @Override + public void run() { + buildRoute(); + if (first) { + origin = CloneUtils.clone(topicRouteMap); + first = false; + } + if (!compare(origin, topicRouteMap)) { + context.requestTaskReconfiguration(); + origin = CloneUtils.clone(topicRouteMap); + } + } + }, sinkConnectConfig.getRefreshInterval(), sinkConnectConfig.getRefreshInterval(), TimeUnit.SECONDS); + } + + public boolean compare(Map<String, Set<MessageQueue>> origin, Map<String, Set<MessageQueue>> updated) { + if (origin.size() != updated.size()) { + return false; + } + for (Map.Entry<String, Set<MessageQueue>> entry : origin.entrySet()) { + if (!updated.containsKey(entry.getKey())) { + return false; + } + Set<MessageQueue> originTasks = entry.getValue(); + Set<MessageQueue> updateTasks = updated.get(entry.getKey()); + if (originTasks.size() != updateTasks.size()) { + return false; + } + + if (!originTasks.containsAll(updateTasks)) { + return false; + } + } + + return true; + } + + public void buildRoute() { + String srcCluster = this.sinkConnectConfig.getSrcCluster(); + try { + for (String topic : this.sinkConnectConfig.getWhiteList()) { + + // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster. + // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of + // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas. + List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster); + Set<String> brokerNameSet = new HashSet<String>(); + for (BrokerData b : brokerList) { + brokerNameSet.add(b.getBrokerName()); + } + + TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic); + if (!topicRouteMap.containsKey(topic)) { + topicRouteMap.put(topic, new HashSet<>(16)); + } + for (QueueData qd : topicRouteData.getQueueDatas()) { + if (brokerNameSet.contains(qd.getBrokerName())) { + for (int i = 0; i < qd.getReadQueueNums(); i++) { + MessageQueue taskTopicInfo = new MessageQueue(topic, qd.getBrokerName(), i); + topicRouteMap.get(topic).add(taskTopicInfo); + } + } + } + } + } catch (Exception e) { + log.error("Fetch topic list error.", e); + } finally { + srcMQAdminExt.shutdown(); + } + } + + + /** + * We need to reason why we don't call srcMQAdminExt.shutdown() here, and why + * it can be applied to srcMQAdminExt + */ + @Override + public void stop() { + listenerHandle.cancel(true); + srcMQAdminExt.shutdown(); + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public Class<? extends Task> taskClass() { + return HudiSinkTask.class; + } + + @Override + public List<KeyValue> taskConfigs() { + log.info("List.start"); + if (!configValid) { + return new ArrayList<KeyValue>(); + } + startMQAdminTools(); + buildRoute(); + DefaultKeyValue defaultKeyValue = new DefaultKeyValue(); + defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_PATH, sinkConnectConfig.getTablePath()); + defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_NAME, sinkConnectConfig.getTableName()); + defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM, sinkConnectConfig.getInsertShuffleParallelism()); + defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM, sinkConnectConfig.getUpsertShuffleParallelism()); + defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM, sinkConnectConfig.getDeleteParallelism()); + defaultKeyValue.put(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER, sinkConnectConfig.getSrcRecordConverter()); + defaultKeyValue.put(HudiConnectConfig.CONN_TOPIC_NAMES, sinkConnectConfig.getTopicNames()); + defaultKeyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, sinkConnectConfig.getSchemaPath()); + defaultKeyValue.put(HudiConnectConfig.CONN_TASK_PARALLELISM, sinkConnectConfig.getTaskParallelism()); + defaultKeyValue.put(HudiConnectConfig.CONN_TASK_DIVIDE_STRATEGY, sinkConnectConfig.getTaskDivideStrategy()); + defaultKeyValue.put(HudiConnectConfig.CONN_WHITE_LIST, JSONObject.toJSONString(sinkConnectConfig.getWhiteList())); + defaultKeyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, sinkConnectConfig.getSchemaPath()); + defaultKeyValue.put(HudiConnectConfig.CONN_TOPIC_ROUTE_INFO, JSONObject.toJSONString(sinkConnectConfig.getTopicRouteMap())); + log.info("taskConfig : " + defaultKeyValue + ", sinkConnectConfig : " + sinkConnectConfig); + ITaskDivideStrategy strategy = TaskDivideStrategyFactory.getInstance(); + List<KeyValue> taskConfigs = strategy.divide(defaultKeyValue); + return taskConfigs; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java new file mode 100644 index 0000000..b01c660 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.common.QueueMetaData; +import io.openmessaging.connector.api.data.SinkDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; +import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig; +import org.apache.rocketmq.connect.hudi.config.ConfigUtil; +import org.apache.rocketmq.connect.hudi.sink.Updater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + + +/** + * In the naming, we are using database for "keyspaces" and table for "columnFamily" + * This is because we kind of want the abstract data source to be aligned with SQL databases + */ +public class HudiSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(HudiSinkTask.class); + + private HudiConnectConfig hudiConnectConfig; + private Updater updater; + + public HudiSinkTask() { + this.hudiConnectConfig = new HudiConnectConfig(); + } + + @Override + public void put(Collection<SinkDataEntry> sinkDataEntries) { + try { + log.info("Hudi Sink Task trying to put()"); + for (SinkDataEntry record : sinkDataEntries) { + log.info("Hudi Sink Task trying to call updater.push()"); + Boolean isSuccess = updater.push(record); + if (!isSuccess) { + log.error("Hudi sink push data error, record:{}", record); + } + log.debug("Hudi pushed data : " + record); + } + } catch (Exception e) { + log.error("put sinkDataEntries error, {}", e); + } + } + + @Override + public void commit(Map<QueueMetaData, Long> map) { + + } + + /** + * Remember always close the CqlSession according to + * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/ + * @param props + */ + @Override + public void start(KeyValue props) { + try { + ConfigUtil.load(props, this.hudiConnectConfig); + log.info("init data source success"); + } catch (Exception e) { + log.error("Cannot start Hudi Sink Task because of configuration error{}", e); + } + try { + updater = new Updater(hudiConnectConfig); + updater.start(); + } catch (Throwable e) { + log.error("fail to start updater{}", e); + } + + } + + @Override + public void stop() { + try { + updater.stop(); + log.info("hudi sink task connection is closed."); + } catch (Throwable e) { + log.warn("sink task stop error while closing connection to {}", "hudi", e); + } + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java new file mode 100644 index 0000000..8e7e288 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.connect.hudi.sink; + + +import io.openmessaging.connector.api.data.SinkDataEntry; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.GenericDataSupplier; +import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class Updater { + + private final Logger log = LoggerFactory.getLogger(getClass()); + private HudiConnectConfig hudiConnectConfig; + private HoodieJavaWriteClient hudiWriteClient; + private HoodieWriteConfig cfg; + private transient ScheduledExecutorService scheduledExecutor; + private int flushIntervalMs = 3000; + private int batchSize = 100; + private List<SinkDataEntry> inflightList; + private Object batchLocker = new Object(); + + + public Updater(HudiConnectConfig hudiConnectConfig) throws Exception { + this.hudiConnectConfig = hudiConnectConfig; + + try { + File schemaFile = new File(hudiConnectConfig.getSchemaPath()); + this.hudiConnectConfig.schema = new Schema.Parser().parse(schemaFile); + log.info("Hudi schema : " + this.hudiConnectConfig.schema.toString()); + } catch (IOException e) { + throw new Exception(String.format("Failed to find schema file %s", hudiConnectConfig.getSchemaPath()), e); + } + Configuration hadoopConf = new Configuration(); + hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); + hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, GenericDataSupplier.class.getName()); + hadoopConf.setClassLoader(this.getClass().getClassLoader()); + hadoopConf.set("fs.hdfs.impl", + DistributedFileSystem.class.getName() + ); + hadoopConf.set("fs.file.impl", + LocalFileSystem.class.getName() + ); + + // fs.%s.impl.disable.cache + hadoopConf.set("fs.file.impl.disable.cache", String.valueOf(true)); + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + + Path path = new Path(hudiConnectConfig.getTablePath()); + FileSystem fs = FSUtils.getFs(hudiConnectConfig.getTablePath(), hadoopConf); + if (!fs.exists(path)) { + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(hudiConnectConfig.getTableType()) + .setTableName(hudiConnectConfig.getTableName()) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, hudiConnectConfig.getTablePath()); + } + log.info("Hudi inited table"); + + this.cfg = HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath()) + .withSchema(this.hudiConnectConfig.schema.toString()) + .withEngineType(EngineType.JAVA) + .withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), hudiConnectConfig.getUpsertShuffleParallelism()) + .withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + cfg.getAvroSchemaValidate(); + this.hudiWriteClient = + new HoodieJavaWriteClient<HoodieAvroPayload>(new HoodieJavaEngineContext(hadoopConf), cfg); + log.info("Open HoodieJavaWriteClient successfully"); + + inflightList = new ArrayList<>(); + if (batchSize > 0) { + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + scheduledExecutor.scheduleAtFixedRate( + () -> { + try { + commit(); + } catch (Exception e) { + log.error("Flush error when executed at fixed rate", e); + } + }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); + } + } + + private GenericRecord sinkDataEntry2GenericRecord(SinkDataEntry record) { + byte[] recordBytes = (byte[]) record.getPayload()[0]; + GenericRecord genericRecord = new GenericData.Record(this.hudiConnectConfig.schema); + DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<GenericRecord>(this.hudiConnectConfig.schema); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null); + try { + if (!decoder.isEnd()) { + genericRecord = userDatumReader.read(genericRecord, decoder); + } + } catch (IOException e) { + log.error("SinkDataEntry convert to GenericRecord occur error,", e); + } + return genericRecord; + } + + public boolean push(SinkDataEntry record) { + log.info("Updater Trying to push data"); + Boolean isSuccess = true; + if (record == null) { + log.warn("Updater push sinkDataRecord null."); + return true; + } + synchronized (batchLocker) { + inflightList.add(record); + } + if (inflightList.size() >= batchSize) { + try { + scheduledExecutor.submit(this::commit); + } catch (Exception e) { + log.error("Updater commmit occur error", e); + isSuccess = false; + } + } + return isSuccess; + } + + private void schemaEvolution(Schema newSchema, Schema oldSchema) { + if (null != oldSchema && oldSchema.toString().equals(newSchema.toString())) { + return; + } + log.info("Schema changed. New schema is " + newSchema.toString()); + this.cfg = HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath()) + .withSchema(this.hudiConnectConfig.schema.toString()) + .withEngineType(EngineType.JAVA) + .withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), hudiConnectConfig.getUpsertShuffleParallelism()) + .withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + this.hudiWriteClient.close(); + Configuration hadoopConf = new Configuration(); + hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false); + hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, GenericDataSupplier.class.getName()); + this.hudiWriteClient = + new HoodieJavaWriteClient<HoodieAvroPayload>(new HoodieJavaEngineContext(hadoopConf), cfg); + } + + public void commit() { + List<SinkDataEntry> commitList; + if (inflightList.isEmpty()) { + return; + } + synchronized (this.inflightList) { + commitList = inflightList; + inflightList = new ArrayList<>(); + } + List<HoodieRecord> hoodieRecordsList = new ArrayList<>(); + for (SinkDataEntry record : commitList) { + GenericRecord genericRecord = sinkDataEntry2GenericRecord(record); + HoodieRecord<HoodieAvroPayload> hoodieRecord = new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "shardingKey-" + record.getQueueName()), new HoodieAvroPayload(Option.of(genericRecord))); + hoodieRecordsList.add(hoodieRecord); + } + try { + List<WriteStatus> statuses = hudiWriteClient.upsert(hoodieRecordsList, hudiWriteClient.startCommit()); + log.info("Upserted data to hudi"); + long upserted = statuses.get(0).getStat().getNumInserts(); + if (upserted != commitList.size()) { + log.warn("Upserted num not equals input"); + } + } catch (Exception e) { + log.error("Exception when upserting to Hudi", e); + } + } + + public void start() throws Exception { + log.info("schema load success"); + } + + public void stop() { + this.hudiWriteClient.close(); + log.info("Hudi sink updater stopped."); + } + + public HudiConnectConfig getHudiConnectConfig() { + return hudiConnectConfig; + } + + public void setHudiConnectConfig(HudiConnectConfig hudiConnectConfig) { + this.hudiConnectConfig = hudiConnectConfig; + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java new file mode 100644 index 0000000..a91c066 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.strategy; + +import io.openmessaging.KeyValue; + +import java.util.List; + + +public interface ITaskDivideStrategy { + List<KeyValue> divide(KeyValue source); +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java new file mode 100644 index 0000000..c68e17c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.strategy; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig; + + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +public class TaskDivideByQueueStrategy implements ITaskDivideStrategy { + @Override + public List<KeyValue> divide(KeyValue source) { + List<KeyValue> config = new ArrayList<KeyValue>(); + int parallelism = source.getInt(HudiConnectConfig.CONN_TASK_PARALLELISM); + Map<String, MessageQueue> topicRouteInfos = (Map<String, MessageQueue>) JSONObject.parse(source.getString(HudiConnectConfig.CONN_TOPIC_ROUTE_INFO)); + int id = 0; + List<List<String>> taskTopicQueues = new ArrayList<>(parallelism); + for (Map.Entry<String, MessageQueue> topicQueue : topicRouteInfos.entrySet()) { + MessageQueue messageQueue = topicQueue.getValue(); + String topicQueueStr = messageQueue.getTopic() + "," + messageQueue.getBrokerName() + "," + messageQueue.getQueueId(); + int ind = ++id % parallelism; + if (taskTopicQueues.get(ind) != null) { + List<String> taskTopicQueue = new LinkedList<>(); + taskTopicQueue.add(topicQueueStr); + taskTopicQueues.add(ind, taskTopicQueue); + } else { + List<String> taskTopicQueue = taskTopicQueues.get(ind); + taskTopicQueue.add(topicQueueStr); + } + } + + for (int i = 0; i < parallelism; i++) { + // build single task queue config; format is topicName1,brokerName1,queueId1;topicName1,brokerName1,queueId2 + String singleTaskTopicQueueStr = ""; + List<String> singleTaskTopicQueues = taskTopicQueues.get(i); + for (String singleTopicQueue : singleTaskTopicQueues) { + singleTaskTopicQueueStr += singleTopicQueue + ";"; + } + singleTaskTopicQueueStr = singleTaskTopicQueueStr.substring(0, singleTaskTopicQueueStr.length() - 1); + // fill connect config; + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(HudiConnectConfig.CONN_TOPIC_QUEUES, singleTaskTopicQueueStr); + keyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_PATH, source.getString(HudiConnectConfig.CONN_HUDI_TABLE_PATH)); + keyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_NAME, source.getString(HudiConnectConfig.CONN_HUDI_TABLE_NAME)); + keyValue.put(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM, source.getInt(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM)); + keyValue.put(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM, source.getInt(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM)); + keyValue.put(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM, source.getInt(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM)); + keyValue.put(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER, source.getString(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER)); + keyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, source.getString(HudiConnectConfig.CONN_SCHEMA_PATH)); + keyValue.put(HudiConnectConfig.CONN_TASK_PARALLELISM, source.getInt(HudiConnectConfig.CONN_TASK_PARALLELISM)); + keyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, source.getString(HudiConnectConfig.CONN_SCHEMA_PATH)); + config.add(keyValue); + } + + return config; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java new file mode 100644 index 0000000..1d693a8 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hudi.strategy; + + +public class TaskDivideStrategyFactory { + public static ITaskDivideStrategy getInstance() { + return new TaskDivideByQueueStrategy(); + } +} diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml new file mode 100644 index 0000000..776b305 --- /dev/null +++ b/style/rmq_checkstyle.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> +<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding --> +<module name="Checker"> + + <property name="localeLanguage" value="en"/> + + <!--To configure the check to report on the first instance in each file--> + <module name="FileTabCharacter"/> + + <!-- header --> + <module name="RegexpHeader"> + <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="System\.out\.println"/> + <property name="message" value="Prohibit invoking System.out.println in source code !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//FIXME"/> + <property name="message" value="Recommended fix FIXME task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//TODO"/> + <property name="message" value="Recommended fix TODO task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="@alibaba"/> + <property name="message" value="Recommended remove @alibaba keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@taobao"/> + <property name="message" value="Recommended remove @taobao keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@author"/> + <property name="message" value="Recommended remove @author tag in javadoc!"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" + value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/> + <property name="message" value="Not allow chinese character !"/> + </module> + + <module name="FileLength"> + <property name="max" value="3000"/> + </module> + + <module name="TreeWalker"> + + <module name="UnusedImports"> + <property name="processJavadoc" value="true"/> + </module> + <module name="RedundantImport"/> + + <!--<module name="IllegalImport" />--> + + <!--Checks that classes that override equals() also override hashCode()--> + <module name="EqualsHashCode"/> + <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.--> + <module name="SimplifyBooleanExpression"/> + <module name="OneStatementPerLine"/> + <module name="UnnecessaryParentheses"/> + <!--Checks for over-complicated boolean return statements. For example the following code--> + <module name="SimplifyBooleanReturn"/> + + <!--Check that the default is after all the cases in producerGroup switch statement--> + <module name="DefaultComesLast"/> + <!--Detects empty statements (standalone ";" semicolon)--> + <module name="EmptyStatement"/> + <!--Checks that long constants are defined with an upper ell--> + <module name="UpperEll"/> + <module name="ConstantName"> + <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/> + </module> + <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property--> + <module name="LocalVariableName"/> + <!--Validates identifiers for local, final variables, including catch parameters--> + <module name="LocalFinalVariableName"/> + <!--Validates identifiers for non-static fields--> + <module name="MemberName"/> + <!--Validates identifiers for class type parameters--> + <module name="ClassTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <!--Validates identifiers for method type parameters--> + <module name="MethodTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"/> + <module name="TypeName"/> + <!--Checks that there are no import statements that use the * notation--> + <module name="AvoidStarImport"/> + + <!--whitespace--> + <module name="GenericWhitespace"/> + <module name="NoWhitespaceBefore"/> + <module name="WhitespaceAfter"/> + <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + </module> + <module name="Indentation"/> + <module name="MethodParamPad"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + </module> +</module>
