ibudanaev-crunchyroll opened a new issue #3707: URL: https://github.com/apache/hudi/issues/3707
Hi, we are trying to use Hudi in aws Kinesis Data Analytics Studio along with Flink. The following python code produces data and inserts it into kinesis stream: ``` def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['uuid'] = str(uuid.uuid4()) data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) res = kinesis.put_record( StreamName="stream-name", Data=data, PartitionKey="partitionkey") time.sleep(1) ``` In Zeppelin, we create a table as a source ingesting the above generated data: ``` %flink.ssql(type=update) drop table stock_table; CREATE TABLE stock_table ( uuid varchar(36), ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-name', 'aws.region' = 'us-west-2', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); ``` Next, we create a table source for hudi on S3: ``` %flink.ssql(type=update) drop table t1; CREATE TABLE t1( uuid VARCHAR(36), ticker VARCHAR(6), price DOUBLE, ts TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'hudi', 'path' = 's3://bucket-name/hudi/', 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE ); ``` And finally, we want to insert data from stream into s3 using hudi: ``` %flink.ssql(type=update) -- insert data using values INSERT INTO t1 select uuid, ticker, price, event_time as ts from stock_table; ``` We get the following error when trying to run the above code: ``` java.lang.NoClassDefFoundError: org/apache/flink/table/connector/sink/DataStreamSinkProvider at org.apache.hudi.table.HoodieTableSink.getSinkRuntimeProvider(HoodieTableSink.java:81) at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:69) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:108) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.connector.sink.DataStreamSinkProvider at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 38 more ``` We tried all the jar files as custom connectors, `hudi-flink-bundle_2.11-0.9.0.jar` and `hudi-flink-bundle_2.11-0.8.0.jar` and `hudi-flink-bundle_2.11-0.7.0.jar` downloaded from [here](https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.11/). You advice/assistance in this matter is much appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org