soumilshah1995 opened a new issue, #7591:
URL: https://github.com/apache/hudi/issues/7591
Good Morning
i am well aware that we can spin up FLink in EMR and get this to work. lot
of people would love to use natively managed Apache Flink which is available in
Kinesis Data Analytics console. my question would be is there a request made
by other to have HUDI in Managed Kinesis Data Analytics (FLink)
### Steps
```
%flink.ssql
DROP TABLE if exists tbl_orders;
CREATE TABLE tbl_orders (
orderid VARCHAR,
customer_id VARCHAR,
ts TIMESTAMP(3),
order_value DOUBLE,
priority VARCHAR,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'order_streams',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
DROP TABLE if exists tbl_orders_hudi;
CREATE TABLE tbl_orders_hudi (
orderid VARCHAR,
customer_id VARCHAR,
order_value DOUBLE
)
WITH (
'connector' = 'hudi',
'path' = 's3://glue-learn-begineers/tmp/',
'table.type' = 'MERGE_ON_READ'
);
```
#### Able to select data from my kinesis streams

#### Fails
```
%flink.ssql(type=update)
INSERT INTO tbl_orders_hudi
SELECT
orderid ,
customer_id ,
order_value
FROM tbl_orders;
```
#### Error Messages
```
Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
java.io.IOException: org.apache.flink.table.api.ValidationException: Unable
to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at
org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSink(HiveDynamicTableFactory.java:81)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
... 32 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option: 'connector'='hudi'
at
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:167)
... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'hudi' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
kafka
kinesis
print
upsert-kafka
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
... 36 more
```
i am assuming that in KDA hudi is not supported natively am i correct ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]