soumilshah1995 opened a new issue, #7591:
URL: https://github.com/apache/hudi/issues/7591

   Good Morning 
   i am well aware that we can spin up FLink in EMR and get this to work. lot 
of people would love to use natively managed Apache Flink which is available in 
Kinesis Data Analytics console. my question would be is there a request  made 
by other to have HUDI in Managed Kinesis Data Analytics (FLink) 
   
   ### Steps
   ```
   %flink.ssql
   
   DROP TABLE if exists tbl_orders;
   
   CREATE TABLE tbl_orders (
       orderid VARCHAR,
       customer_id VARCHAR,
       ts TIMESTAMP(3),
       order_value DOUBLE,
       priority VARCHAR,
       WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
   
   )
   WITH (
       'connector' = 'kinesis',
       'stream' = 'order_streams',
       'aws.region' = 'us-east-1',
       'scan.stream.initpos' = 'LATEST',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
       );
   
   
   DROP TABLE if exists tbl_orders_hudi;
   
   CREATE TABLE tbl_orders_hudi (
       orderid VARCHAR,
       customer_id VARCHAR,
       order_value DOUBLE
   
   )
   WITH (
       'connector' = 'hudi',
       'path' = 's3://glue-learn-begineers/tmp/',
       'table.type' = 'MERGE_ON_READ' 
       );
   
   ```
   #### Able to select data from my kinesis streams 
   
![image](https://user-images.githubusercontent.com/39345855/210243080-98500324-0c81-413a-8072-45a8d0668b0b.png)
   
   ####  Fails 
   ```
   %flink.ssql(type=update)
   
   INSERT INTO tbl_orders_hudi
   SELECT 
       orderid ,
       customer_id ,
       order_value 
   FROM tbl_orders;
   
   ```
   
   #### Error Messages 
   
   ```
   Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
   java.io.IOException: org.apache.flink.table.api.ValidationException: Unable 
to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
        at 
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
        ... 14 more
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
        at 
org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSink(HiveDynamicTableFactory.java:81)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
        ... 32 more
   Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
connector using option: 'connector'='hudi'
        at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:167)
        ... 34 more
   Caused by: org.apache.flink.table.api.ValidationException: Could not find 
any factory for identifier 'hudi' that implements 
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
   
   Available factory identifiers are:
   
   blackhole
   datagen
   filesystem
   kafka
   kinesis
   print
   upsert-kafka
        at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
        at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
        ... 36 more
   ```
   
   i am assuming that in KDA hudi is not supported natively am i correct ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to