guyuqi commented on pull request #4617: URL: https://github.com/apache/hudi/pull/4617#issuecomment-1018298182
> LGTM. @guyuqi Could you run the [Quick Start Guide](https://github.com/apache/hudi/tree/master/hudi-kafka-connect) for Kafka Connect Sink for Hudi to make sure the Sink functionality is not affected? Thanks for your comments. From [Quick Start Guide](https://github.com/apache/hudi/tree/master/hudi-kafka-connect), **Enviroments:** ``` export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk/ export CONFLUENT_DIR=/home/builder/confluent-7.0.1 export PATH=${CONFLUENT_DIR}/bin:${PATH} export KAFKA_HOME=/home/builder/kafka_2.12-3.0.0 export HUDI_DIR=/home/builder/hudi Linux fdr33-test-vm 5.11.0-43-generic #47~20.04.2-Ubuntu SMP Mon Dec 13 11:10:13 UTC 2021 aarch64 aarch64 aarch64 GNU/Linux ``` **1. Successfully Create the Hudi Topic for the Sink and insert data into the topic:** ``` [builder@fdr33-test-vm demo]$ bash setupKafka.sh -n 3 Argument num-kafka-records is 3 Delete Kafka topic hudi-test-topic ... Create Kafka topic hudi-test-topic ... Created topic hudi-test-topic. {"id":1}{"subject":"hudi-test-topic","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"stock_ticks\",\"fields\":[{\"name\":\"volume\",\"type\":\"long\"},{\"name\":\"ts\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":\"string\"},{\"name\":\"year\",\"type\":\"int\"},{\"name\":\"month\",\"type\":\"string\"},{\"name\":\"high\",\"type\":\"double\"},{\"name\":\"low\",\"type\":\"double\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"date\",\"type\":\"string\"},{\"name\":\"close\",\"type\":\"double\"},{\"name\":\"open\",\"type\":\"double\"},{\"name\":\"day\",\"type\":\"string\"}]}"}Fri Jan 21 08:25:52 UTC 2022 Start batch 1 ... Fri Jan 21 08:25:53 UTC 2022 Record key until 3 publish to Kafka ... ``` ``` [builder@fdr33-test-vm demo]$ bash setupKafka.sh -n 3 -b 3 Argument num-kafka-records is 3 Argument num-batch is 3 Delete Kafka topic hudi-test-topic ... Create Kafka topic hudi-test-topic ... Created topic hudi-test-topic. {"id":1}{"subject":"hudi-test-topic","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"stock_ticks\",\"fields\":[{\"name\":\"volume\",\"type\":\"long\"},{\"name\":\"ts\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":\"string\"},{\"name\":\"year\",\"type\":\"int\"},{\"name\":\"month\",\"type\":\"string\"},{\"name\":\"high\",\"type\":\"double\"},{\"name\":\"low\",\"type\":\"double\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"date\",\"type\":\"string\"},{\"name\":\"close\",\"type\":\"double\"},{\"name\":\"open\",\"type\":\"double\"},{\"name\":\"day\",\"type\":\"string\"}]}"}Fri Jan 21 08:27:04 UTC 2022 Start batch 1 ... Fri Jan 21 08:27:04 UTC 2022 Record key until 3 publish to Kafka ... Fri Jan 21 08:27:24 UTC 2022 Start batch 2 ... Fri Jan 21 08:27:25 UTC 2022 Record key until 6 publish to Kafka ... Fri Jan 21 08:27:45 UTC 2022 Start batch 3 ... Fri Jan 21 08:27:45 UTC 2022 Record key until 9 publish to Kafka ... ``` **2. Run the Sink connector worker** ``` [builder@fdr33-test-vm kafka_2.12-3.0.0]$ ./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties [2022-01-21 08:30:48,524] INFO WorkerInfo values: jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/home/builder/kafka_2.12-3.0.0/bin/../logs, -Dlog4j.configuration=file:./bin/../config/connect-log4j.properties jvm.spec = Red Hat, Inc., OpenJDK 64-Bit Server VM, 1.8.0_312, 25.312-b07 jvm.classpath = /home/builder/kafka_2.12-3.0.0/bin/../libs/activation-1.1.1.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/argparse4j-0.7.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/audience-annotations-0.5.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/commons-cli-1.4.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/commons-lang3-3.8.1.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-api-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-basic-auth-extension-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-file-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-json-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-mirror-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-mirror-client-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-runtime-3.0.0.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/connect-transforms-3.0.0.jar: /home/builder/kafka_2.12-3.0.0/bin/../libs/hk2-api-2.6.1.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/hk2-locator-2.6.1.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/hk2-utils-2.6.1.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-annotations-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-core-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-databind-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-dataformat-csv-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-datatype-jdk8-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-jaxrs-base-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-jaxrs-json-provider-2.12.3.jar:/home/builder/kafka_2.12-3.0.0/bin/../libs/jackson-module-jaxb-annotations ............... .......... ....... [2022-01-21 08:31:05,629] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Subscribed to partition(s): connect-configs-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1121) [2022-01-21 08:31:05,630] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Seeking to EARLIEST offset of partition connect-configs-0 (org.apache.kafka.clients.consumer.internals.SubscriptionState:641) [2022-01-21 08:31:05,641] INFO [Consumer clientId=consumer-hudi-connect-cluster-3, groupId=hudi-connect-cluster] Resetting offset for partition connect-configs-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[fdr33-test-vm:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398) [2022-01-21 08:31:05,642] INFO Finished reading KafkaBasedLog for topic connect-configs (org.apache.kafka.connect.util.KafkaBasedLog:202) [2022-01-21 08:31:05,642] INFO Started KafkaBasedLog for topic connect-configs (org.apache.kafka.connect.util.KafkaBasedLog:204) [2022-01-21 08:31:05,643] INFO Started KafkaConfigBackingStore (org.apache.kafka.connect.storage.KafkaConfigBackingStore:306) [2022-01-21 08:31:05,643] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:322) [2022-01-21 08:31:05,657] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Cluster ID: n2GHZbkaRAaByx27gk9Bhw (org.apache.kafka.clients.Metadata:287) [2022-01-21 08:31:05,658] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Discovered group coordinator fdr33-test-vm:9092 (id: 2147483647 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:849) [2022-01-21 08:31:05,661] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222) [2022-01-21 08:31:05,662] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:535) [2022-01-21 08:31:05,683] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:535) [2022-01-21 08:31:05,688] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Successfully joined group with generation Generation{generationId=1, memberId='connect-1-02d6edfd-fcc3-4565-b4fc-e2a550da3ef4', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:591) [2022-01-21 08:31:05,722] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Successfully synced group in generation Generation{generationId=1, memberId='connect-1-02d6edfd-fcc3-4565-b4fc-e2a550da3ef4', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:757) [2022-01-21 08:31:05,723] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Joined group at generation 1 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-02d6edfd-fcc3-4565-b4fc-e2a550da3ef4', leaderUrl='http://172.17.0.2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1848) [2022-01-21 08:31:05,723] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1378) [2022-01-21 08:31:05,724] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1406) [2022-01-21 08:31:05,823] INFO [Worker clientId=connect-1, groupId=hudi-connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1716) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
