[ 
https://issues.apache.org/jira/browse/FLINK-28337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561354#comment-17561354
 ] 

Martijn Visser commented on FLINK-28337:
----------------------------------------

[~migowei] I mean that the Flink community is not involved with integrating 
with Apache Iceberg. Iceberg has created an implementation for that, but for 
questions regarding integrating Iceberg with Flink, you should reach out to the 
Iceberg project. 

> java.lang.IllegalArgumentException: Table identifier not set
> ------------------------------------------------------------
>
>                 Key: FLINK-28337
>                 URL: https://issues.apache.org/jira/browse/FLINK-28337
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.2
>         Environment: Flink 1.14.2
> Hive 3.1.2
> Iceberg 0.12.1
> Hadoop 3.2.1
>            Reporter: wei
>            Priority: Major
>
> I use Flink Table SDK to select iceberg table. Set hivecatalog to 
> usercatalog, but looks like the default_catalog is still used.
> The error message is as flollows:
> {code:java}
> 0:42:41,886 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl           
>  [] - s3a-file-system metrics system started
> 10:42:44,392 INFO  org.apache.iceberg.BaseMetastoreCatalog                    
>   [] - Table loaded by catalog: 
> default_iceberg.s3a_flink.icebergtbcloudtrackingtest
> 10:42:44,397 INFO  org.apache.iceberg.mr.hive.HiveIcebergSerDe                
>   [] - Using schema from existing table 
> {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]}
> 10:42:44,832 INFO  org.apache.iceberg.BaseMetastoreTableOperations            
>   [] - Refreshing table metadata from new version: 
> s3a://warehouse/s3a_flink.db/icebergTBCloudTrackingTest/metadata/00011-8d1ef9f1-8172-49fd-b0de-58196642b662.metadata.json
> 10:42:44,866 INFO  org.apache.iceberg.BaseMetastoreCatalog                    
>   [] - Table loaded by catalog: 
> default_iceberg.s3a_flink.icebergtbcloudtrackingtest
> 10:42:44,867 INFO  org.apache.iceberg.mr.hive.HiveIcebergSerDe                
>   [] - Using schema from existing table 
> {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]}
> 10:42:48,079 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient       
>   [] - Trying to connect to metastore with URI thrift://hiveserver:9083
> 10:42:48,079 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient       
>   [] - Opened a connection to metastore, current connections: 3
> 10:42:48,081 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient       
>   [] - Connected to metastore.
> 10:42:48,081 INFO  org.apache.hadoop.hive.metastore.RetryingMetaStoreClient   
>   [] - RetryingMetaStoreClient proxy=class 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=root (auth:SIMPLE) 
> retries=1 delay=1 lifetime=0
> 10:42:48,132 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient       
>   [] - Closed a connection to metastore, current connections: 2
> 10:42:48,308 INFO  org.apache.flink.connectors.hive.HiveParallelismInference  
>   [] - Hive source(s3a_flink.icebergTBCloudTrackingTest}) getNumFiles use 
> time: 171 ms, result: 2
> Exception in thread "main" java.lang.IllegalArgumentException: Table 
> identifier not set
>     at 
> org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142)
>     at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:114)
>     at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:89)
>     at 
> org.apache.iceberg.mr.mapreduce.IcebergInputFormat.lambda$getSplits$0(IcebergInputFormat.java:102)
>     at java.util.Optional.orElseGet(Optional.java:267)
>     at 
> org.apache.iceberg.mr.mapreduce.IcebergInputFormat.getSplits(IcebergInputFormat.java:102)
>     at 
> org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.getSplits(MapredIcebergInputFormat.java:69)
>     at 
> org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:98)
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:107)
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149)
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104) {code}
> code is :
> {code:java}
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .inBatchMode()
>                 .build();
>         TableEnvironment tableEnv = TableEnvironment.create(settings);
>         String catalogName = "s3IcebergCatalog";
>         String defaultDatabase = "s3a_flink";
>         String hiveConfDir = "flink-cloud/src/main/resources";
>         HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, 
> hiveConfDir);
>         tableEnv.registerCatalog(catalogName, hive);
>         tableEnv.useCatalog(catalogName);
>         tableEnv.useDatabase(defaultDatabase);
>         System.out.println(tableEnv.getCurrentCatalog());
>         String tableName = "icebergTBCloudTrackingTest";
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         String sql = "select uuid from " + tableName;
>         System.out.println(sql);
>         tableEnv.executeSql(sql).print();
>  {code}
> The output of `tableEnv.getCurrentCatalog()` is `s3IcebergCatalog`. But it 
> reports `10:42:44,866 INFO  org.apache.iceberg.BaseMetastoreCatalog    [] - 
> Table loaded by catalog: default_iceberg.s3a_flink.icebergtbcloudtrackingtest 
> `,  and shows `java.lang.IllegalArgumentException: Table identifier not set`.
> Does anyone know the reason please?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to