[
https://issues.apache.org/jira/browse/FLINK-28276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17560806#comment-17560806
]
wei commented on FLINK-28276:
-----------------------------
Thanks! I think this is the cause of the problem. I create table by flink sdk
as follow:
{code:java}
Map<String, String> properties = new HashMap<>();
properties.put("type", "iceberg");
properties.put("clients", "5");
properties.put("property-version", "1");
properties.put("warehouse", "s3a://warehouse/");
properties.put("catalog-type", "hive");
properties.put("uri", "thrift://hiveserver:9083");
Configuration conf = new Configuration();
conf.set("fs.s3a.connection.ssl.enabled", "false");
conf.set("fs.s3a.endpoint", "http://mys3");
conf.set("fs.s3a.access.key", "lkmyCR8it8zhKFh1Shzo0PpTfu4IpTzR");
conf.set("fs.s3a.secret.key", "s0m1yB1LmnNdAuQQh3NLffPtZY9g18JN");
conf.set("fs.s3a.region", "us-east-1");
conf.set("fs.s3a.path.style.access", "true");
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
conf.set("fs.s3a.fast.upload", "true");
conf.set("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
"true");
String HIVE_CATALOG = "s3IcebergCatalog";
CatalogLoader catalogLoader = CatalogLoader.hive(HIVE_CATALOG, conf,
properties);
HiveCatalog catalog = (HiveCatalog) catalogLoader.loadCatalog();
Namespace namespace = Namespace.of("s3a_flink");
if (!catalog.namespaceExists(namespace))
catalog.createNamespace(namespace);
TableIdentifier name =
TableIdentifier.of(namespace, "icebergTBCloudTracking");
Schema schema = new Schema(0,
Types.NestedField.required(1, "vin", Types.StringType.get()),
Types.NestedField.required(2, "name", Types.StringType.get()),
Types.NestedField.optional(3, "uuid", Types.StringType.get()),
Types.NestedField.required(4, "channel",
Types.StringType.get()),
Types.NestedField.required(5, "run_scene",
Types.StringType.get()),
Types.NestedField.required(6, "timestamp",
Types.TimestampType.withoutZone()),
Types.NestedField.required(7, "rcv_timestamp",
Types.TimestampType.withoutZone()),
Types.NestedField.required(8, "raw", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
FileFormat.ORC.name());
Table table = null;
if (!catalog.tableExists(name))
table = catalog.createTable(name, schema, spec, props);
else
table = catalog.loadTable(name); {code}
May I ask how to specify inputformat when creating table by sdk? Thanks!
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate
> the hadoop input format
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-28276
> URL: https://issues.apache.org/jira/browse/FLINK-28276
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.14.2
> Environment: Flink 1.14.2
> Hive 3.1.2
> Scala 2.12
> Iceberg 0.12.1
> Hadoop 3.2.1
> Reporter: wei
> Priority: Major
> Attachments: BA9CEEA0-BF38-4568-A7AD-66C68B19CF14.png,
> image-2022-06-30-09-37-44-705.png, image-2022-06-30-10-36-17-075.png
>
>
> When I read Iceberg tables using Flink HiveCatalog, based on S3A, I got this
> error:
>
> {code:java}
> //代码占位符
> Exception in thread "main"
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate
> the hadoop input format
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
> at
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149)
> at
> org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
> at
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95)
> at
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144)
> at
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
> at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
> at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
> at
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
> at loshu.flink.hive.FlinkSQLHiveWriter.main(FlinkSQLHiveWriter.java:69)
> Caused by: java.lang.InstantiationException
> at
> sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:98)
> ... 30 more
> 16:33:36,767 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
> [] - Stopping s3a-file-system metrics system...
> 16:33:36,767 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
> [] - s3a-file-system metrics system stopped.
> 16:33:36,768 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl
> [] - s3a-file-system metrics system shutdown complete.Process finished with
> exit code 1
> {code}
> My code is:
> {code:java}
> //代码占位符
> public class FlinkSQLHiveWriter {
> private static org.apache.log4j.Logger log =
> Logger.getLogger(FlinkSQLHiveWriter.class);
> public static void main(String[] args) throws Exception {
> System.setProperty("HADOOP_USER_NAME", "root");
> System.setProperty("hadoop.home.dir", "/opt/hadoop-3.2.1/");
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .inBatchMode()
> .build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> String catalogName = "s3IcebergCatalog";
> String defaultDatabase = "s3a_flink";
> String hiveConfDir = "flink-cloud/src/main/resources";
> HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase,
> hiveConfDir);
> tableEnv.registerCatalog(catalogName, hive);
> tableEnv.useCatalog(catalogName);
> tableEnv.useDatabase(defaultDatabase);
> System.out.println(hive.listDatabases());
> System.out.println(hive.listTables(defaultDatabase));
> String tableName = "icebergTBCloudTracking";
> // set sql dialect as default, means using flink sql.
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> String sql = "select vin from " + tableName;
> // String sql = "DESC " + tableName;
> System.out.println(sql);
> Table table = tableEnv.sqlQuery(sql);
> table.execute();
> }
> } {code}
> I can "show tables" or "describe tables", but when using "select * from
> table" the error occurs.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)