[
https://issues.apache.org/jira/browse/HUDI-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vishal Agarwal updated HUDI-4575:
---------------------------------
Description:
Hi team,
I am trying to run Hudi Sink Connector with Kafka Connect. When the connectors
starts, it starts the Transaction coordinator which initialises the global
committed offsets from the Hudi commit file. When its a first time run, there
is no commit file and hence it outputs
[2022-08-08 19:58:20,529] INFO Hoodie Extra Metadata from latest commit is
absent (org.apache.hudi.connect.writers.KafkaConnectTransactionServices:147)
But say in first time, the earliest kafka offset is not 0, then the process
keeps on running the commit timelines. Ideally, the global offsets, at first
run, should be set to the earliest kafka offset.
As per the current implementation, the participant checks the local offset with
coordinator offset and when its a mismatch, it sets to 0. But this breaks, when
its a fresh run and the global kafka commited offset is not 0
was:
Hi team,
I am trying to use Hudi sink connector with Kafka Connect to write to GCS
bucket. But I am getting error regarding "gs" file scheme. I have added all GCS
related properties in core-site.xml and the corresponding gcs-connector jar in
the plugin path. But still facing the issue.
The issue was already reported with S3 as per jira
https://issues.apache.org/jira/browse/HUDI-3610. But I am unable to get the
resolution.
Happy to discuss on this !
Thanks
*StackTrace-*
%d [%thread] %-5level %logger - %msg%n
org.apache.hudi.exception.HoodieException: Fatal error instantiating Hudi Write
Provider
at
org.apache.hudi.connect.writers.KafkaConnectWriterProvider.<init>(KafkaConnectWriterProvider.java:103)
~[connectors-uber.jar:?]
at
org.apache.hudi.connect.transaction.ConnectTransactionParticipant.<init>(ConnectTransactionParticipant.java:65)
~[connectors-uber.jar:?]
at org.apache.hudi.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:198)
[connectors-uber.jar:?]
at org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151)
[connectors-uber.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587)
[connect-runtime-2.4.1.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
[connect-runtime-2.4.1.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652)
[connect-runtime-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
[connect-runtime-2.4.1.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
[connect-runtime-2.4.1.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
[connect-runtime-2.4.1.jar:?]
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
[connect-runtime-2.4.1.jar:?]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
[connect-runtime-2.4.1.jar:?]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
[connect-runtime-2.4.1.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_331]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_331]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_331]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_331]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_331]
Caused by: org.apache.hudi.exception.HoodieIOException: Failed to get instance
of org.apache.hadoop.fs.FileSystem
at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:109)
~[connectors-uber.jar:?]
at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:100)
~[connectors-uber.jar:?]
at org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:69)
~[connectors-uber.jar:?]
at
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:175)
~[connectors-uber.jar:?]
at
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:160)
~[connectors-uber.jar:?]
at
org.apache.hudi.client.HoodieJavaWriteClient.<init>(HoodieJavaWriteClient.java:55)
~[connectors-uber.jar:?]
at
org.apache.hudi.connect.writers.KafkaConnectWriterProvider.<init>(KafkaConnectWriterProvider.java:101)
~[connectors-uber.jar:?]
... 25 more
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem
for scheme "gs"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3225)
~[connectors-uber.jar:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3245)
~[connectors-uber.jar:?]
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
~[connectors-uber.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296)
~[connectors-uber.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264)
~[connectors-uber.jar:?]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475)
~[connectors-uber.jar:?]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
~[connectors-uber.jar:?]
at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:107)
~[connectors-uber.jar:?]
at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:100)
~[connectors-uber.jar:?]
at org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:69)
~[connectors-uber.jar:?]
at
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:175)
~[connectors-uber.jar:?]
at
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:160)
~[connectors-uber.jar:?]
at
org.apache.hudi.client.HoodieJavaWriteClient.<init>(HoodieJavaWriteClient.java:55)
~[connectors-uber.jar:?]
at
org.apache.hudi.connect.writers.KafkaConnectWriterProvider.<init>(KafkaConnectWriterProvider.java:101)
~[connectors-uber.jar:?]
... 25 more
> Initial Kafka Global Offsets in Hudi Kafka Sink Connector
> ----------------------------------------------------------
>
> Key: HUDI-4575
> URL: https://issues.apache.org/jira/browse/HUDI-4575
> Project: Apache Hudi
> Issue Type: Bug
> Components: kafka-connect
> Reporter: Vishal Agarwal
> Priority: Critical
>
> Hi team,
> I am trying to run Hudi Sink Connector with Kafka Connect. When the
> connectors starts, it starts the Transaction coordinator which initialises
> the global committed offsets from the Hudi commit file. When its a first
> time run, there is no commit file and hence it outputs
> [2022-08-08 19:58:20,529] INFO Hoodie Extra Metadata from latest commit is
> absent (org.apache.hudi.connect.writers.KafkaConnectTransactionServices:147)
> But say in first time, the earliest kafka offset is not 0, then the process
> keeps on running the commit timelines. Ideally, the global offsets, at first
> run, should be set to the earliest kafka offset.
> As per the current implementation, the participant checks the local offset
> with coordinator offset and when its a mismatch, it sets to 0. But this
> breaks, when its a fresh run and the global kafka commited offset is not 0
--
This message was sent by Atlassian Jira
(v8.20.10#820010)