[ 
https://issues.apache.org/jira/browse/HUDI-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Agarwal updated HUDI-4575:
---------------------------------
    Description: 
Hi team,
I am trying to run Hudi Sink Connector with Kafka Connect. When the connectors 
starts, it starts the Transaction coordinator which initialises the global 
committed  offsets from the Hudi commit file. When its a first time run, there 
is no commit file and hence it outputs
[2022-08-08 19:58:20,529] INFO Hoodie Extra Metadata from latest commit is 
absent (org.apache.hudi.connect.writers.KafkaConnectTransactionServices:147) 
But say in first time, the earliest kafka offset is not 0, then the process 
keeps on running the commit timelines. Ideally, the global offsets, at first 
run, should be set to the earliest kafka offset.
As per the current implementation, the participant checks the local offset with 
coordinator offset and when its a mismatch, it sets to 0. But this breaks, when 
its a fresh run and the global kafka commited offset is not 0

  was:
Hi team,

I am trying to use Hudi sink connector with Kafka Connect to write to GCS 
bucket. But I am getting error regarding "gs" file scheme. I have added all GCS 
related properties in core-site.xml and the corresponding gcs-connector jar in 
the plugin path. But still facing the issue.

The issue was already reported with S3 as per jira 
https://issues.apache.org/jira/browse/HUDI-3610. But I am unable to get the 
resolution.

Happy to discuss on this !

Thanks


*StackTrace-*

%d [%thread] %-5level %logger - %msg%n 
org.apache.hudi.exception.HoodieException: Fatal error instantiating Hudi Write 
Provider 

 at 
org.apache.hudi.connect.writers.KafkaConnectWriterProvider.<init>(KafkaConnectWriterProvider.java:103)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.connect.transaction.ConnectTransactionParticipant.<init>(ConnectTransactionParticipant.java:65)
 ~[connectors-uber.jar:?]

 at org.apache.hudi.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:198) 
[connectors-uber.jar:?]

 at org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151) 
[connectors-uber.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587)
 [connect-runtime-2.4.1.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
 [connect-runtime-2.4.1.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652)
 [connect-runtime-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
 [kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
 [kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
 [kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
 [kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
 [kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
 [kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
[kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
[kafka-clients-2.4.1.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
 [connect-runtime-2.4.1.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) 
[connect-runtime-2.4.1.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
 [connect-runtime-2.4.1.jar:?]

 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
 [connect-runtime-2.4.1.jar:?]

 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) 
[connect-runtime-2.4.1.jar:?]

 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) 
[connect-runtime-2.4.1.jar:?]

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_331]

 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_331]

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_331]

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_331]

 at java.lang.Thread.run(Thread.java:750) [?:1.8.0_331]

Caused by: org.apache.hudi.exception.HoodieIOException: Failed to get instance 
of org.apache.hadoop.fs.FileSystem

 at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:109) 
~[connectors-uber.jar:?]

 at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:100) 
~[connectors-uber.jar:?]

 at org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:69) 
~[connectors-uber.jar:?]

 at 
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:175)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:160)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.client.HoodieJavaWriteClient.<init>(HoodieJavaWriteClient.java:55)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.connect.writers.KafkaConnectWriterProvider.<init>(KafkaConnectWriterProvider.java:101)
 ~[connectors-uber.jar:?]

 ... 25 more

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem 
for scheme "gs"

 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3225) 
~[connectors-uber.jar:?]

 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3245) 
~[connectors-uber.jar:?]

 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121) 
~[connectors-uber.jar:?]

 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296) 
~[connectors-uber.jar:?]

 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264) 
~[connectors-uber.jar:?]

 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475) 
~[connectors-uber.jar:?]

 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) 
~[connectors-uber.jar:?]

 at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:107) 
~[connectors-uber.jar:?]

 at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:100) 
~[connectors-uber.jar:?]

 at org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:69) 
~[connectors-uber.jar:?]

 at 
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:175)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:160)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.client.HoodieJavaWriteClient.<init>(HoodieJavaWriteClient.java:55)
 ~[connectors-uber.jar:?]

 at 
org.apache.hudi.connect.writers.KafkaConnectWriterProvider.<init>(KafkaConnectWriterProvider.java:101)
 ~[connectors-uber.jar:?]

 ... 25 more


> Initial Kafka Global Offsets  in Hudi Kafka Sink Connector
> ----------------------------------------------------------
>
>                 Key: HUDI-4575
>                 URL: https://issues.apache.org/jira/browse/HUDI-4575
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: kafka-connect
>            Reporter: Vishal Agarwal
>            Priority: Critical
>
> Hi team,
> I am trying to run Hudi Sink Connector with Kafka Connect. When the 
> connectors starts, it starts the Transaction coordinator which initialises 
> the global committed  offsets from the Hudi commit file. When its a first 
> time run, there is no commit file and hence it outputs
> [2022-08-08 19:58:20,529] INFO Hoodie Extra Metadata from latest commit is 
> absent (org.apache.hudi.connect.writers.KafkaConnectTransactionServices:147) 
> But say in first time, the earliest kafka offset is not 0, then the process 
> keeps on running the commit timelines. Ideally, the global offsets, at first 
> run, should be set to the earliest kafka offset.
> As per the current implementation, the participant checks the local offset 
> with coordinator offset and when its a mismatch, it sets to 0. But this 
> breaks, when its a fresh run and the global kafka commited offset is not 0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to