ibudanaev-crunchyroll opened a new issue #3707:
URL: https://github.com/apache/hudi/issues/3707


   Hi,
   
   we are trying to use Hudi in aws Kinesis Data Analytics Studio along with 
Flink.
   
   The following python code produces data and inserts it into kinesis stream:
   ```
   def getReferrer():
       data = {}
       now = datetime.datetime.now()
       str_now = now.isoformat()
       data['uuid'] = str(uuid.uuid4())
       data['event_time'] = str_now
   
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data = json.dumps(getReferrer())
       print(data)
       res = kinesis.put_record(
           StreamName="stream-name",
           Data=data,
           PartitionKey="partitionkey")
       time.sleep(1)
   ```
   
   In Zeppelin, we create a table as a source ingesting the above generated 
data:
   ```
   %flink.ssql(type=update)
   
   drop table stock_table;
   
   CREATE TABLE stock_table (
   uuid varchar(36),
   ticker VARCHAR(6),
   price DOUBLE,
   event_time TIMESTAMP(3),
   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   )
   PARTITIONED BY (ticker)
   WITH (
   'connector' = 'kinesis',
   'stream' = 'stream-name',
   'aws.region' = 'us-west-2',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   ```
   
   Next, we create a table source for hudi on S3:
   ```
   %flink.ssql(type=update)
   
   drop table t1;
   
   CREATE TABLE t1(
   uuid VARCHAR(36),
   ticker VARCHAR(6),
   price DOUBLE,
   ts TIMESTAMP(3)
   )
   PARTITIONED BY (ticker)
   WITH (
     'connector' = 'hudi',
     'path' = 's3://bucket-name/hudi/',
     'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by 
default is COPY_ON_WRITE
   );
   ```
   
   And finally, we want to insert data from stream into s3 using hudi:
   ```
   %flink.ssql(type=update)
   -- insert data using values
   INSERT INTO t1 select uuid, ticker, price, event_time as ts from stock_table;
   ```
   
   We get the following error when trying to run the above code:
   ```
   java.lang.NoClassDefFoundError: 
org/apache/flink/table/connector/sink/DataStreamSinkProvider
        at 
org.apache.hudi.table.HoodieTableSink.getSinkRuntimeProvider(HoodieTableSink.java:81)
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:69)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:108)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
        at 
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.connector.sink.DataStreamSinkProvider
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        ... 38 more
   ```
   
   We tried all the jar files as custom connectors, 
`hudi-flink-bundle_2.11-0.9.0.jar` and `hudi-flink-bundle_2.11-0.8.0.jar` and 
`hudi-flink-bundle_2.11-0.7.0.jar` downloaded from 
[here](https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.11/).
   
   You advice/assistance in this matter is much appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to