[
https://issues.apache.org/jira/browse/FLINK-28337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561354#comment-17561354
]
Martijn Visser commented on FLINK-28337:
----------------------------------------
[~migowei] I mean that the Flink community is not involved with integrating
with Apache Iceberg. Iceberg has created an implementation for that, but for
questions regarding integrating Iceberg with Flink, you should reach out to the
Iceberg project.
> java.lang.IllegalArgumentException: Table identifier not set
> ------------------------------------------------------------
>
> Key: FLINK-28337
> URL: https://issues.apache.org/jira/browse/FLINK-28337
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.14.2
> Environment: Flink 1.14.2
> Hive 3.1.2
> Iceberg 0.12.1
> Hadoop 3.2.1
> Reporter: wei
> Priority: Major
>
> I use Flink Table SDK to select iceberg table. Set hivecatalog to
> usercatalog, but looks like the default_catalog is still used.
> The error message is as flollows:
> {code:java}
> 0:42:41,886 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
> [] - s3a-file-system metrics system started
> 10:42:44,392 INFO org.apache.iceberg.BaseMetastoreCatalog
> [] - Table loaded by catalog:
> default_iceberg.s3a_flink.icebergtbcloudtrackingtest
> 10:42:44,397 INFO org.apache.iceberg.mr.hive.HiveIcebergSerDe
> [] - Using schema from existing table
> {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]}
> 10:42:44,832 INFO org.apache.iceberg.BaseMetastoreTableOperations
> [] - Refreshing table metadata from new version:
> s3a://warehouse/s3a_flink.db/icebergTBCloudTrackingTest/metadata/00011-8d1ef9f1-8172-49fd-b0de-58196642b662.metadata.json
> 10:42:44,866 INFO org.apache.iceberg.BaseMetastoreCatalog
> [] - Table loaded by catalog:
> default_iceberg.s3a_flink.icebergtbcloudtrackingtest
> 10:42:44,867 INFO org.apache.iceberg.mr.hive.HiveIcebergSerDe
> [] - Using schema from existing table
> {"type":"struct","schema-id":0,"fields":[{"id":1,"name":"vin","required":true,"type":"string"},{"id":2,"name":"name","required":true,"type":"string"},{"id":3,"name":"uuid","required":false,"type":"string"},{"id":4,"name":"channel","required":true,"type":"string"},{"id":5,"name":"run_scene","required":true,"type":"string"},{"id":6,"name":"timestamp","required":true,"type":"timestamp"},{"id":7,"name":"rcv_timestamp","required":true,"type":"timestamp"},{"id":8,"name":"raw","required":true,"type":"string"}]}
> 10:42:48,079 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient
> [] - Trying to connect to metastore with URI thrift://hiveserver:9083
> 10:42:48,079 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient
> [] - Opened a connection to metastore, current connections: 3
> 10:42:48,081 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient
> [] - Connected to metastore.
> 10:42:48,081 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient
> [] - RetryingMetaStoreClient proxy=class
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=root (auth:SIMPLE)
> retries=1 delay=1 lifetime=0
> 10:42:48,132 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient
> [] - Closed a connection to metastore, current connections: 2
> 10:42:48,308 INFO org.apache.flink.connectors.hive.HiveParallelismInference
> [] - Hive source(s3a_flink.icebergTBCloudTrackingTest}) getNumFiles use
> time: 171 ms, result: 2
> Exception in thread "main" java.lang.IllegalArgumentException: Table
> identifier not set
> at
> org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142)
> at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:114)
> at org.apache.iceberg.mr.Catalogs.loadTable(Catalogs.java:89)
> at
> org.apache.iceberg.mr.mapreduce.IcebergInputFormat.lambda$getSplits$0(IcebergInputFormat.java:102)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.iceberg.mr.mapreduce.IcebergInputFormat.getSplits(IcebergInputFormat.java:102)
> at
> org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.getSplits(MapredIcebergInputFormat.java:69)
> at
> org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:98)
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:107)
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
> at
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149)
> at
> org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
> at
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95)
> at
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144)
> at
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
> at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
> at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104) {code}
> code is :
> {code:java}
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .inBatchMode()
> .build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> String catalogName = "s3IcebergCatalog";
> String defaultDatabase = "s3a_flink";
> String hiveConfDir = "flink-cloud/src/main/resources";
> HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase,
> hiveConfDir);
> tableEnv.registerCatalog(catalogName, hive);
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> System.out.println(tableEnv.getCurrentCatalog());
> String tableName = "icebergTBCloudTrackingTest";
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> String sql = "select uuid from " + tableName;
> System.out.println(sql);
> tableEnv.executeSql(sql).print();
> {code}
> The output of `tableEnv.getCurrentCatalog()` is `s3IcebergCatalog`. But it
> reports `10:42:44,866 INFO org.apache.iceberg.BaseMetastoreCatalog [] -
> Table loaded by catalog: default_iceberg.s3a_flink.icebergtbcloudtrackingtest
> `, and shows `java.lang.IllegalArgumentException: Table identifier not set`.
> Does anyone know the reason please?
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)