[ 
https://issues.apache.org/jira/browse/FLINK-28276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17560806#comment-17560806
 ] 

wei commented on FLINK-28276:
-----------------------------

Thanks! I think this is the cause of the problem. I create table by flink sdk 
as follow:
{code:java}
Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("clients", "5");
        properties.put("property-version", "1");
        properties.put("warehouse", "s3a://warehouse/");
        properties.put("catalog-type", "hive");
        properties.put("uri", "thrift://hiveserver:9083");


        Configuration conf = new Configuration();
        conf.set("fs.s3a.connection.ssl.enabled", "false");
        conf.set("fs.s3a.endpoint", "http://mys3";);
        conf.set("fs.s3a.access.key", "lkmyCR8it8zhKFh1Shzo0PpTfu4IpTzR");
        conf.set("fs.s3a.secret.key", "s0m1yB1LmnNdAuQQh3NLffPtZY9g18JN");
        conf.set("fs.s3a.region", "us-east-1");
        conf.set("fs.s3a.path.style.access", "true");
        conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        conf.set("fs.s3a.fast.upload", "true");
        
conf.set("execution.checkpointing.checkpoints-after-tasks-finish.enabled", 
"true");

        String HIVE_CATALOG = "s3IcebergCatalog";
        CatalogLoader catalogLoader = CatalogLoader.hive(HIVE_CATALOG, conf, 
properties);
        HiveCatalog catalog = (HiveCatalog) catalogLoader.loadCatalog();

        Namespace namespace = Namespace.of("s3a_flink");
        if (!catalog.namespaceExists(namespace))
            catalog.createNamespace(namespace);

        TableIdentifier name =
                TableIdentifier.of(namespace, "icebergTBCloudTracking");

        Schema schema = new Schema(0,
                Types.NestedField.required(1, "vin", Types.StringType.get()),
                Types.NestedField.required(2, "name", Types.StringType.get()),
                Types.NestedField.optional(3, "uuid", Types.StringType.get()),
                Types.NestedField.required(4, "channel", 
Types.StringType.get()),
                Types.NestedField.required(5, "run_scene", 
Types.StringType.get()),
                Types.NestedField.required(6, "timestamp", 
Types.TimestampType.withoutZone()),
                Types.NestedField.required(7, "rcv_timestamp", 
Types.TimestampType.withoutZone()),
                Types.NestedField.required(8, "raw", Types.StringType.get())
                );

         PartitionSpec spec = PartitionSpec.unpartitioned();

        Map<String, String> props =
                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
FileFormat.ORC.name());

        Table table = null;
 
        if (!catalog.tableExists(name))
            table = catalog.createTable(name, schema, spec, props);
        else
            table = catalog.loadTable(name); {code}
May I ask how to specify inputformat when creating table by sdk?  Thanks!

> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate 
> the hadoop input format
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28276
>                 URL: https://issues.apache.org/jira/browse/FLINK-28276
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.2
>         Environment: Flink 1.14.2
> Hive 3.1.2
> Scala 2.12
> Iceberg 0.12.1
> Hadoop 3.2.1
>            Reporter: wei
>            Priority: Major
>         Attachments: BA9CEEA0-BF38-4568-A7AD-66C68B19CF14.png, 
> image-2022-06-30-09-37-44-705.png, image-2022-06-30-10-36-17-075.png
>
>
> When I read Iceberg tables using Flink HiveCatalog, based on S3A,  I got this 
> error:
>  
> {code:java}
> //代码占位符
> Exception in thread "main" 
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate 
> the hadoop input format
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:149)
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:95)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.getDataStream(HiveTableSource.java:144)
>     at 
> org.apache.flink.connectors.hive.HiveTableSource$1.produceDataStream(HiveTableSource.java:114)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.java:49)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:82)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:81)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
>     at 
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
>     at loshu.flink.hive.FlinkSQLHiveWriter.main(FlinkSQLHiveWriter.java:69)
> Caused by: java.lang.InstantiationException
>     at 
> sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     at java.lang.Class.newInstance(Class.java:442)
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:98)
>     ... 30 more
> 16:33:36,767 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl          
>   [] - Stopping s3a-file-system metrics system...
> 16:33:36,767 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl          
>   [] - s3a-file-system metrics system stopped.
> 16:33:36,768 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl          
>   [] - s3a-file-system metrics system shutdown complete.Process finished with 
> exit code 1
>  {code}
> My code is:
> {code:java}
> //代码占位符
> public class FlinkSQLHiveWriter {
>     private static org.apache.log4j.Logger log = 
> Logger.getLogger(FlinkSQLHiveWriter.class);
>     public static void main(String[] args) throws Exception {
>         System.setProperty("HADOOP_USER_NAME", "root");
>         System.setProperty("hadoop.home.dir", "/opt/hadoop-3.2.1/");
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .inBatchMode()
>                 .build();
>         TableEnvironment tableEnv = TableEnvironment.create(settings);
>         String catalogName = "s3IcebergCatalog";
>         String defaultDatabase = "s3a_flink";
>         String hiveConfDir = "flink-cloud/src/main/resources"; 
>         HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, 
> hiveConfDir);
>         tableEnv.registerCatalog(catalogName, hive);
>         tableEnv.useCatalog(catalogName);
>         tableEnv.useDatabase(defaultDatabase);
>         System.out.println(hive.listDatabases());
>         System.out.println(hive.listTables(defaultDatabase));
>         String tableName = "icebergTBCloudTracking";
>         // set sql dialect as default, means using flink sql.
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         String sql = "select vin from " + tableName;
> //        String sql = "DESC " + tableName;
>         System.out.println(sql);
>         Table table = tableEnv.sqlQuery(sql);
>         table.execute();
>     }
> } {code}
> I can "show tables" or "describe tables", but when using "select * from 
> table" the error occurs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to