ycjunhua opened a new issue #3704: URL: https://github.com/apache/hudi/issues/3704
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? yes - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** A clear and concise description of the problem. **To Reproduce** Steps to reproduce the behavior: 1.use the The following code import com.yuou.flinkhudi.util.HudiHandler import com.yuou.flinkhudi.util.flink.FlinkUtils import com.yuou.flinkhudi.util.flink.dwd.source.ReaderCustomerMerchantInfoource.{createTablesql, env, tableEnv} import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, table2RowDataStream} import org.apache.hudi.client.HoodieFlinkWriteClient import org.apache.hudi.config.HoodieCompactionConfig import org.apache.hudi.configuration.FlinkOptions /** * 生成国家编码测试数据 * * @author zhujh * @date 2021-09-18 */ object ContryAreaCodeData { private var env: StreamExecutionEnvironment = null private var tableEnv: StreamTableEnvironment = null private var createTablesql: String = "" /** * 生成国家区域编码数据 */ def generateContryAreaCodeData(): Unit = { System.setProperty("HADOOP_USER_NAME", "dops") System.setProperty("hadoop.home.dir", "D:\\共享目录\\development\\SDK\\hadoop-2.7.7") env = FlinkUtils.getStreamExecutionEnviromentInstance() //assert(null == env) tableEnv = FlinkUtils.getStreamTableEnviromentInstance(env) // access flink configuration val configuration = tableEnv.getConfig().getConfiguration() // set low-level key-value options // set execution.result-mode=tableau; //set execution.checkpointing.interval=10sec; configuration.setString("execution.result-mode", "tableau") configuration.setString("execution.checkpointing.interval", "10sec") //assert(null == tableEnv) //在table上下文创建表 createTablesql = """ |CREATE TABLE configure_area( | id int PRIMARY KEY NOT ENFORCED COMMENT 'ID,主键', | code varchar(128) COMMENT '编码', | name varchar(128) COMMENT '名称', | parent_code varchar(128) COMMENT '父级编码', | level int COMMENT '层级【0-根,1-国家,2-省,3-市,4-区,5-街道】', | full_name varchar(255) COMMENT '全名称(省-市-区-街道)', | sap_code varchar(128) COMMENT 'SAP编码', | remark varchar(255) COMMENT '备注', | status varchar(10) COMMENT '状态:【ENABLE-启用,DISABLE-禁用', | create_by varchar(50) COMMENT '创建人编码', | create_time TIMESTAMP(0) COMMENT '创建时间', | update_by varchar(50) COMMENT '修改人编码', | update_time TIMESTAMP(0) COMMENT '修改时间', | is_deleted int COMMENT '是否删除:【0-否 ,1-是' |) |WITH ( | 'connector' = 'hudi', | 'path' = 'file:///d:/yuou/dev/ods/mmt-new/router_configure/configure_area', | 'table.type' = 'COPY_ON_WRITE', -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRI | 'write.precombine.field' = 'id', | 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.NonpartitionedKeyGenerator', | 'metadata.compaction.delta_commits' = '1' |) """.stripMargin HudiHandler.createTable(tableEnv, createTablesql) tableEnv.executeSql( """ |insert into configure_area values |(1,'1','中国','0',1,'中国','sap_area','','ENABLE','zhangsan',TIMESTAMP '2021-09-18 16:48:00','ayj',TIMESTAMP '2021-09-18 16:48:00',0) |""".stripMargin) tableEnv.executeSql("insert into configure_area values(2,'2','四川','1',2,'中国-四川','sap_area','','ENABLE','lisi',TIMESTAMP '2021-09-18 16:47:00','ayj',TIMESTAMP '2021-09-18 16:48:00',0)") tableEnv.executeSql("insert into configure_area values(3,'3','成都','2',3" + ",'中国-四川-成都','sap_area','','ENABLE','liuwu'" + ",TIMESTAMP '2021-09-18 16:49:00','liuwu',TIMESTAMP '2021-09-18 16:49:00',0)") tableEnv.executeSql("insert into configure_area values(4,'4','锦江区','3',4" + ",'中国-四川-成都-锦江区','sap_area','','ENABLE','qiqi'" + ",TIMESTAMP '2021-09-18 16:50:00','qiqi',TIMESTAMP '2021-09-18 16:50:00',0)") tableEnv.executeSql( "select * from configure_area").print() env.execute("ContryAreaCodeData") } def main(args: Array[String]): Unit = { generateContryAreaCodeData() } } 2. Run the above code to query the table Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error at org.apache.hudi.table.HoodieTableSource.getInputFormat(HoodieTableSource.java:322) at org.apache.hudi.table.HoodieTableSource.getInputFormat(HoodieTableSource.java:302) at org.apache.hudi.table.HoodieTableSource$1.produceDataStream(HoodieTableSource.java:190) at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:91) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at com.yuou.flinkhudi.util.flink.dwd.generate.ContryAreaCodeData$.generateContryAreaCodeData(ContryAreaCodeData.scala:91) at com.yuou.flinkhudi.util.flink.dwd.generate.ContryAreaCodeData$.main(ContryAreaCodeData.scala:99) at com.yuou.flinkhudi.util.flink.dwd.generate.ContryAreaCodeData.main(ContryAreaCodeData.scala) Caused by: org.apache.hudi.exception.InvalidTableException: Invalid Hoodie Table. file:/d:/yuou/dev/ods/mmt-new/router_configure/configure_area at org.apache.hudi.common.table.TableSchemaResolver.lambda$getTableParquetSchemaFromDataFile$0(TableSchemaResolver.java:88) at org.apache.hudi.common.util.Option.orElseThrow(Option.java:123) at org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:88) at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:153) at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:187) at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:163) at org.apache.hudi.table.HoodieTableSource.getInputFormat(HoodieTableSource.java:320) 3.pom文件 <name>FlinkOperateHudi</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <flink.version>1.12.1</flink.version> </properties> <dependencies> <!-- Flink操作Hudi需要的包--> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink-bundle_2.11</artifactId> <version>0.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- java 开发Flink所需依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink 开发Scala需要导入以下依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- 读取hdfs文件需要jar包--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.2</version> </dependency> <!-- Flink 状态管理 RocksDB 依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink Kafka连接器的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.12.1</version> </dependency> <!-- Flink HDFS Sink--> <!--<dependency>--> <!--<groupId>org.apache.flink</groupId>--> <!--<artifactId>flink-connector-filesystem_2.11</artifactId>--> <!--<version>${flink.version}</version>--> <!--</dependency>--> <!-- Flink SQL & Table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink SQL中使用Blink 需要导入的包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.16</version><!--版本号自己选一个就行--> </dependency> </dependencies> 4. **Expected behavior** A clear and concise description of what you expected to happen. **Environment Description** * Hudi version :0.9 * FLINK 1.12.1 * Spark version : * Hive version : * Hadoop version : * Storage (HDFS/S3/GCS..) : * Running on Docker? (yes/no) :no **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
