ashajyothi828 opened a new issue #2209:
URL: https://github.com/apache/iceberg/issues/2209


   Hi Team,
   
   I am new to flink and iceberg. I am trying to add rows to an existing 
iceberg table from a flink job with flink sql. I'm able to do that using flink 
sql in command line, but when I try it in java, I'm getting some errors. Its 
looking for connector and I couldnt figure out the iceberg connector details 
that I can pass in from java code from the documentation. Can you please help 
me with this?
   Below is my code and the error that I'm getting. Thanks in advance.
   
   ```
   
        TableEnvironment tEnv = TableEnvironment
                                
.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().
                                                
withBuiltInCatalogName("flink_hadoop_catalog").withBuiltInDatabaseName("flink_iceberg_db").build());
                try {
                exec(tEnv, "CREATE TABLE 
flink_hadoop_catalog.flink_iceberg_db.transactions%d (AccountId BIGINT 
,Timestamp1 BIGINT,Amount DOUBLE)", value.getLong(0));
                }
                catch(AlreadyExistsException e) {
                // do nothing
                }
   
        String query = "INSERT INTO %s VALUES (%d,%d,%f)";
        TableResult tr = exec(tEnv, query, 
"flink_hadoop_catalog.flink_iceberg_db.transactions"+value.getLong(0), 
value.getLong(0), value.getLong(1), value.getDouble(2));
                
   
   ```
   ```
   
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
a sink for writing table 'flink_hadoop_catalog.flink_iceberg_db.transactions1'.
   
   Table options are:
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:349)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
        at 
spendreport.TransactionSinkFunction2.exec(TransactionSinkFunction2.java:123)
        at 
spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:102)
        at 
spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:55)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
        at spendreport.FraudDetector.processElement(FraudDetector.java:136)
        at spendreport.FraudDetector.processElement(FraudDetector.java:45)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.flink.table.api.ValidationException: Table options do 
not contain an option key 'connector' for discovering a connector.
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
        ... 42 more
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to