ycjunhua opened a new issue #3704:
URL: https://github.com/apache/hudi/issues/3704


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   yes
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.use the The following code
   
   import com.yuou.flinkhudi.util.HudiHandler
   import com.yuou.flinkhudi.util.flink.FlinkUtils
   import 
com.yuou.flinkhudi.util.flink.dwd.source.ReaderCustomerMerchantInfoource.{createTablesql,
 env, tableEnv}
   import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter
   import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
   import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, 
table2RowDataStream}
   import org.apache.hudi.client.HoodieFlinkWriteClient
   import org.apache.hudi.config.HoodieCompactionConfig
   import org.apache.hudi.configuration.FlinkOptions
   
   /**
    * 生成国家编码测试数据
    *
    * @author zhujh
    * @date 2021-09-18
    */
   object ContryAreaCodeData {
   
     private var env: StreamExecutionEnvironment = null
     private var tableEnv: StreamTableEnvironment = null
     private var createTablesql: String = ""
   
   
     /**
      * 生成国家区域编码数据
      */
     def generateContryAreaCodeData(): Unit = {
   
       System.setProperty("HADOOP_USER_NAME", "dops")
       System.setProperty("hadoop.home.dir", 
"D:\\共享目录\\development\\SDK\\hadoop-2.7.7")
       env = FlinkUtils.getStreamExecutionEnviromentInstance()
       //assert(null == env)
       tableEnv = FlinkUtils.getStreamTableEnviromentInstance(env)
       // access flink configuration
       val configuration = tableEnv.getConfig().getConfiguration()
       // set low-level key-value options
       // set execution.result-mode=tableau;
       //set execution.checkpointing.interval=10sec;
       configuration.setString("execution.result-mode", "tableau")
       configuration.setString("execution.checkpointing.interval", "10sec")
       //assert(null == tableEnv)
       //在table上下文创建表
   
       createTablesql =
         """
           |CREATE TABLE configure_area(
           |  id int PRIMARY KEY NOT ENFORCED  COMMENT 'ID,主键',
           |  code varchar(128) COMMENT '编码',
           |  name varchar(128)  COMMENT '名称',
           |  parent_code varchar(128)  COMMENT '父级编码',
           |  level int COMMENT '层级【0-根,1-国家,2-省,3-市,4-区,5-街道】',
           |  full_name varchar(255)  COMMENT '全名称(省-市-区-街道)',
           |  sap_code varchar(128) COMMENT 'SAP编码',
           |  remark varchar(255)  COMMENT '备注',
           |  status varchar(10) COMMENT '状态:【ENABLE-启用,DISABLE-禁用',
           |  create_by varchar(50) COMMENT '创建人编码',
           |  create_time TIMESTAMP(0)  COMMENT '创建时间',
           |  update_by varchar(50) COMMENT '修改人编码',
           |  update_time TIMESTAMP(0)  COMMENT '修改时间',
           |  is_deleted int COMMENT '是否删除:【0-否 ,1-是'
           |)
           |WITH (
           |  'connector' = 'hudi',
           |  'path' = 
'file:///d:/yuou/dev/ods/mmt-new/router_configure/configure_area',
           |  'table.type' = 'COPY_ON_WRITE', -- this creates a MERGE_ON_READ 
table, by default is COPY_ON_WRI
           |  'write.precombine.field' = 'id',
           |  'hoodie.datasource.write.keygenerator.class' = 
'org.apache.hudi.NonpartitionedKeyGenerator',
           |  'metadata.compaction.delta_commits' = '1'
           |)
           """.stripMargin
   
       HudiHandler.createTable(tableEnv, createTablesql)
       tableEnv.executeSql(
         """
          |insert into configure_area values
          |(1,'1','中国','0',1,'中国','sap_area','','ENABLE','zhangsan',TIMESTAMP 
'2021-09-18 16:48:00','ayj',TIMESTAMP '2021-09-18 16:48:00',0)
          |""".stripMargin)
   
       tableEnv.executeSql("insert into configure_area 
values(2,'2','四川','1',2,'中国-四川','sap_area','','ENABLE','lisi',TIMESTAMP 
'2021-09-18 16:47:00','ayj',TIMESTAMP '2021-09-18 16:48:00',0)")
   
       tableEnv.executeSql("insert into configure_area values(3,'3','成都','2',3" 
+
             ",'中国-四川-成都','sap_area','','ENABLE','liuwu'" +
             ",TIMESTAMP '2021-09-18 16:49:00','liuwu',TIMESTAMP '2021-09-18 
16:49:00',0)")
       tableEnv.executeSql("insert into configure_area 
values(4,'4','锦江区','3',4" +
             ",'中国-四川-成都-锦江区','sap_area','','ENABLE','qiqi'" +
             ",TIMESTAMP '2021-09-18 16:50:00','qiqi',TIMESTAMP '2021-09-18 
16:50:00',0)")
   
   
       tableEnv.executeSql( "select * from configure_area").print()
   
   
       env.execute("ContryAreaCodeData")
     }
   
   
     def main(args: Array[String]): Unit = {
       generateContryAreaCodeData()
     }
   
   }
   
   2. Run the above code to query the table
   Exception in thread "main" org.apache.hudi.exception.HoodieException: Get 
table avro schema error
        at 
org.apache.hudi.table.HoodieTableSource.getInputFormat(HoodieTableSource.java:322)
        at 
org.apache.hudi.table.HoodieTableSource.getInputFormat(HoodieTableSource.java:302)
        at 
org.apache.hudi.table.HoodieTableSource$1.produceDataStream(HoodieTableSource.java:190)
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:91)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:44)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:44)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
        at 
com.yuou.flinkhudi.util.flink.dwd.generate.ContryAreaCodeData$.generateContryAreaCodeData(ContryAreaCodeData.scala:91)
        at 
com.yuou.flinkhudi.util.flink.dwd.generate.ContryAreaCodeData$.main(ContryAreaCodeData.scala:99)
        at 
com.yuou.flinkhudi.util.flink.dwd.generate.ContryAreaCodeData.main(ContryAreaCodeData.scala)
   Caused by: org.apache.hudi.exception.InvalidTableException: Invalid Hoodie 
Table. file:/d:/yuou/dev/ods/mmt-new/router_configure/configure_area
        at 
org.apache.hudi.common.table.TableSchemaResolver.lambda$getTableParquetSchemaFromDataFile$0(TableSchemaResolver.java:88)
        at org.apache.hudi.common.util.Option.orElseThrow(Option.java:123)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:88)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:153)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:187)
        at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:163)
        at 
org.apache.hudi.table.HoodieTableSource.getInputFormat(HoodieTableSource.java:320)
   
   3.pom文件
    <name>FlinkOperateHudi</name>
   
       <properties>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           <maven.compiler.source>1.8</maven.compiler.source>
           <maven.compiler.target>1.8</maven.compiler.target>
           <flink.version>1.12.1</flink.version>
       </properties>
   
       <dependencies>
           <!-- Flink操作Hudi需要的包-->
           <dependency>
               <groupId>org.apache.hudi</groupId>
               <artifactId>hudi-flink-bundle_2.11</artifactId>
               <version>0.9.0</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-clients_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <!-- java 开发Flink所需依赖 -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-java</artifactId>
               <version>${flink.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <!-- Flink 开发Scala需要导入以下依赖 -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-scala_2.11</artifactId>
               <version>${flink.version}</version>
               <!--<scope>provided</scope>-->
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-scala_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <!-- 读取hdfs文件需要jar包-->
           <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-client</artifactId>
           <version>2.9.2</version>
           </dependency>
           <!-- Flink 状态管理 RocksDB 依赖 -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <!-- Flink Kafka连接器的依赖 -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kafka_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-csv</artifactId>
               <version>1.12.1</version>
           </dependency>
           <!-- Flink  HDFS Sink-->
           <!--<dependency>-->
           <!--<groupId>org.apache.flink</groupId>-->
           <!--<artifactId>flink-connector-filesystem_2.11</artifactId>-->
           <!--<version>${flink.version}</version>-->
           <!--</dependency>-->
   
   
           <!-- Flink SQL & Table-->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-planner_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <!-- Flink SQL中使用Blink 需要导入的包-->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-planner-blink_2.11</artifactId>
               <version>${flink.version}</version>
           </dependency>
           <dependency>
               <groupId>org.projectlombok</groupId>
               <artifactId>lombok</artifactId>
               <version>1.16.16</version><!--版本号自己选一个就行-->
           </dependency>
       </dependencies>
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.9
   * FLINK 1.12.1
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) :
   
   * Running on Docker? (yes/no) :no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to