saimigo opened a new issue, #5145:
URL: https://github.com/apache/iceberg/issues/5145
When I read Iceberg tables using Flink HiveCatalog, based on S3A, I got
this error:
```
Exception in thread "main"
org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the
hadoop input format
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
at
org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149)
at
org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
at
org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95)
at
org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144)
at
org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
at
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at
org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
at loshu.flink.hive.FlinkSQLHiveWriter.main(FlinkSQLHiveWriter.java:69)
Caused by: java.lang.InstantiationException
at
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:98)
... 30 more
16:33:36,767 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
[] - Stopping s3a-file-system metrics system...
16:33:36,767 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
[] - s3a-file-system metrics system stopped.
16:33:36,768 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
[] - s3a-file-system metrics system shutdown complete.Process finished with
exit code 1
```
My code is:
```
public class FlinkSQLHiveWriter {
private static org.apache.log4j.Logger log =
Logger.getLogger(FlinkSQLHiveWriter.class);
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
System.setProperty("hadoop.home.dir", "/opt/hadoop-3.2.1/");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String catalogName = "s3IcebergCatalog";
String defaultDatabase = "s3a_flink";
String hiveConfDir = "flink-cloud/src/main/resources";
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase,
hiveConfDir);
tableEnv.registerCatalog(catalogName, hive);
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
System.out.println(hive.listDatabases());
System.out.println(hive.listTables(defaultDatabase));
String tableName = "icebergTBCloudTracking";
// set sql dialect as default, means using flink sql.
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sql = "select vin from " + tableName;
// String sql = "DESC " + tableName;
System.out.println(sql);
Table table = tableEnv.sqlQuery(sql);
table.execute();
}
}
```
I can "show tables" or "describe tables", but when using "select * from
table" the error occurs.
#ENV
Flink 1.14.2
Hive 3.1.2
Scala 2.12
Iceberg 0.12.1
Hadoop 3.2.1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]