t822876884 opened a new issue #3796:
URL: https://github.com/apache/hudi/issues/3796


   hudi 0.9.0
   flink 1.12.2
   
   ```java
   public static void main(String[] args) {
           //ENV
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.setStateBackend(new FsStateBackend(YARN_CKP_PATH));
           env.enableCheckpointing(60000);
           
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
           env.setParallelism(1);
   
           EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner()
                   .inStreamingMode().build();
           StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, settings);
   
           FlinkKafkaConsumer<String> consumer = new 
FlinkKafkaConsumer<String>(KAFKA_TOPIC, new SimpleStringSchema(), 
kafkaProperties());
           consumer.setStartFromTimestamp(1633795200000L);
   
           //SOURCE
           DataStreamSource<String> yarnDS = env
                   .addSource(consumer)
                   .setParallelism(8);
   
   
           DataStream<YarnDataEntity> dataDs = yarnDS.filter(new 
FilterFunction<String>() {
               @Override
               public boolean filter(String value) throws Exception {
                   String type = 
JSONObject.parseObject(value).getString("type");
                   if (("yarn").equals(type)) {
                       return true;
                   }
                   return false;
               }
           }).setParallelism(4)
                   .map(new MapFunction<String, YarnDataEntity>() {
               @Override
               public YarnDataEntity map(String value) throws Exception {
                   String data = 
JSONObject.parseObject(value).getString("data");
                   YarnDataEntity yarnDataEntities = 
JSONObject.parseObject(data, YarnDataEntity.class);
                   
yarnDataEntities.setDt(DateUtil.convertTimeByLong(yarnDataEntities.getStartedTime()));
                   return yarnDataEntities;
               }
           }).setParallelism(8);
   
          Table dataDsYarn = tableEnvironment.fromDataStream(dataDs);
   
           //Table result = tableEnvironment.sqlQuery("SELECT * FROM " + 
dataDsYarn);
           //tableEnvironment.toAppendStream(result, 
YarnDataEntity.class).print();
   
           tableEnvironment.executeSql("CREATE TABLE big_data_analyse_yarn(" +
                   " allocatedMB INT," +
                   " allocatedVCores INT," +
                   " amContainerLogs VARCHAR(200)," +
                   " amHostHttpAddress VARCHAR(200)," +
                   " amNodeLabelExpression VARCHAR(200)," +
                   " amRPCAddress VARCHAR(20)," +
                   " appNodeLabelExpression VARCHAR(200)," +
                   " applicationTags VARCHAR(200)," +
                   " applicationType VARCHAR(20)," +
                   " clusterId BIGINT," +
                   " clusterUsagePercentage FLOAT," +
                   " diagnostics VARCHAR(200)," +
                   " dt VARCHAR(20)," +
                   " elapsedTime BIGINT, " +
                   " finalStatus VARCHAR(200)," +
                   " finishedTime BIGINT," +
                   " id VARCHAR(200)," +
                   " logAggregationStatus VARCHAR(200)," +
                   " memorySeconds BIGINT, " +
                   " name VARCHAR(200)," +
                   " numAMContainerPreempted INT, " +
                   " numNonAMContainerPreempted INT, " +
                   " preemptedResourceMB int," +
                   " preemptedResourceVCores BIGINT, " +
                   " priority VARCHAR(200)," +
                   " progress FLOAT, " +
                   " queue VARCHAR(200)," +
                   " queueUsagePercentage FLOAT, " +
                   " runningContainers INT, " +
                   " startedTime BIGINT," +
                   " `state` VARCHAR(200)," +
                   " trackingUI VARCHAR(200)," +
                   " trackingUrl VARCHAR(200)," +
                   " unmanagedApplication boolean," +
                   " `user` VARCHAR(20)," +
                   " vcoreSeconds BIGINT" +
                   ")" +
                   " PARTITIONED BY (dt)" +
                   "WITH (" +
                   "  'connector' = 'hudi'," +
                   "  'path' = '"+ YARN_DATA_PATH +"'," +
                   "  'write.tasks' = '8'," +
                   "  'read.streaming.enabled'= 'true',  " +
                   "  'table.type' = 'MERGE_ON_READ', " +
                   "  'read.streaming.check-interval' = '30'," +
                   "  'write.precombine.field' = 'dt'," +
                   "  'hoodie.datasource.write.operation' = 'insert'," +
                   "  'hoodie.datasource.write.recordkey.field' = 'id' " +
                   " )");
   
           tableEnvironment.executeSql("insert into big_data_analyse_yarn 
select * from " + dataDsYarn);
       }
   ```
   
   ```
   org.apache.kafka.common.errors.DisconnectException: null
   2021-10-14 14:11:58,232 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141158]
   2021-10-14 14:12:20,232 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141201]
   2021-10-14 14:13:20,591 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141224]
   2021-10-14 14:13:25,829 INFO  org.apache.hudi.sink.compact.CompactFunction   
              [] - Executor executes action [Execute compaction for instant 
20211014141222 from task 3] success!
   2021-10-14 14:13:25,829 INFO  org.apache.hudi.sink.compact.CompactFunction   
              [] - Executor executes action [Execute compaction for instant 
20211014141222 from task 4] success!
   2021-10-14 14:13:47,082 INFO  org.apache.hudi.sink.compact.CompactFunction   
              [] - Executor executes action [Execute compaction for instant 
20211014141222 from task 9] success!
   2021-10-14 14:14:19,166 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141326]
   2021-10-14 14:15:19,821 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141427]
   2021-10-14 14:16:19,527 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141525]
   2021-10-14 14:17:20,115 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141627]
   2021-10-14 14:18:19,149 INFO  org.apache.hudi.sink.StreamWriteFunction       
              [] - No data to write in subtask [3] for instant [20211014141723]
   2021-10-14 14:18:22,363 ERROR org.apache.hudi.sink.compact.CompactFunction   
              [] - Executor executes action [Execute compaction for instant 
20211014141722 from task 9] error
   java.lang.NoClassDefFoundError: 
org/apache/hadoop/mapreduce/lib/input/FileInputFormat
        at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_252]
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756) 
~[?:1.8.0_252]
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
~[?:1.8.0_252]
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) 
~[?:1.8.0_252]
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74) 
~[?:1.8.0_252]
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_252]
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_252]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_252]
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362) 
~[?:1.8.0_252]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_252]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
~[?:1.8.0_252]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_252]
        at 
org.apache.hudi.org.apache.parquet.HadoopReadOptions$Builder.<init>(HadoopReadOptions.java:95)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.org.apache.parquet.HadoopReadOptions.builder(HadoopReadOptions.java:79)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.org.apache.parquet.hadoop.ParquetReader$Builder.<init>(ParquetReader.java:198)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.org.apache.parquet.avro.AvroParquetReader$Builder.<init>(AvroParquetReader.java:107)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.org.apache.parquet.avro.AvroParquetReader$Builder.<init>(AvroParquetReader.java:99)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.org.apache.parquet.avro.AvroParquetReader.builder(AvroParquetReader.java:48)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:65)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.table.action.commit.FlinkMergeHelper.runMerge(FlinkMergeHelper.java:89)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:338)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:329)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor.compact(HoodieFlinkMergeOnReadTableCompactor.java:146)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.table.action.compact.FlinkCompactHelpers.compact(FlinkCompactHelpers.java:95)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.sink.compact.CompactFunction.doCompaction(CompactFunction.java:101)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.sink.compact.CompactFunction.lambda$processElement$0(CompactFunction.java:91)
 ~[k-bdg-stream.jar:?]
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:67)
 ~[k-bdg-stream.jar:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_252]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
   Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) 
~[?:1.8.0_252]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_252]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
~[?:1.8.0_252]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_252]
        ... 30 more
   2021-10-14 14:18:54,783 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Attempting to cancel task hoodie_stream_write (4/8)#0 
(0f52575916f8c6ea1f60f8d7cc213bba).
   2021-10-14 14:18:54,784 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - hoodie_stream_write (4/8)#0 
(0f52575916f8c6ea1f60f8d7cc213bba) switched from RUNNING to CANCELING.
   2021-10-14 14:18:54,784 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Triggering cancellation of task code hoodie_stream_write 
(4/8)#0 (0f52575916f8c6ea1f60f8d7cc213bba).
   2021-10-14 14:18:54,786 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Attempting to cancel task hoodie_stream_write (5/8)#0 
(40e056157180fbedb677e193c9f2f921).
   ```'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to