t822876884 opened a new issue #3796:
URL: https://github.com/apache/hudi/issues/3796
hudi 0.9.0
flink 1.12.2
```java
public static void main(String[] args) {
//ENV
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(YARN_CKP_PATH));
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<String>(KAFKA_TOPIC, new SimpleStringSchema(),
kafkaProperties());
consumer.setStartFromTimestamp(1633795200000L);
//SOURCE
DataStreamSource<String> yarnDS = env
.addSource(consumer)
.setParallelism(8);
DataStream<YarnDataEntity> dataDs = yarnDS.filter(new
FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
String type =
JSONObject.parseObject(value).getString("type");
if (("yarn").equals(type)) {
return true;
}
return false;
}
}).setParallelism(4)
.map(new MapFunction<String, YarnDataEntity>() {
@Override
public YarnDataEntity map(String value) throws Exception {
String data =
JSONObject.parseObject(value).getString("data");
YarnDataEntity yarnDataEntities =
JSONObject.parseObject(data, YarnDataEntity.class);
yarnDataEntities.setDt(DateUtil.convertTimeByLong(yarnDataEntities.getStartedTime()));
return yarnDataEntities;
}
}).setParallelism(8);
Table dataDsYarn = tableEnvironment.fromDataStream(dataDs);
//Table result = tableEnvironment.sqlQuery("SELECT * FROM " +
dataDsYarn);
//tableEnvironment.toAppendStream(result,
YarnDataEntity.class).print();
tableEnvironment.executeSql("CREATE TABLE big_data_analyse_yarn(" +
" allocatedMB INT," +
" allocatedVCores INT," +
" amContainerLogs VARCHAR(200)," +
" amHostHttpAddress VARCHAR(200)," +
" amNodeLabelExpression VARCHAR(200)," +
" amRPCAddress VARCHAR(20)," +
" appNodeLabelExpression VARCHAR(200)," +
" applicationTags VARCHAR(200)," +
" applicationType VARCHAR(20)," +
" clusterId BIGINT," +
" clusterUsagePercentage FLOAT," +
" diagnostics VARCHAR(200)," +
" dt VARCHAR(20)," +
" elapsedTime BIGINT, " +
" finalStatus VARCHAR(200)," +
" finishedTime BIGINT," +
" id VARCHAR(200)," +
" logAggregationStatus VARCHAR(200)," +
" memorySeconds BIGINT, " +
" name VARCHAR(200)," +
" numAMContainerPreempted INT, " +
" numNonAMContainerPreempted INT, " +
" preemptedResourceMB int," +
" preemptedResourceVCores BIGINT, " +
" priority VARCHAR(200)," +
" progress FLOAT, " +
" queue VARCHAR(200)," +
" queueUsagePercentage FLOAT, " +
" runningContainers INT, " +
" startedTime BIGINT," +
" `state` VARCHAR(200)," +
" trackingUI VARCHAR(200)," +
" trackingUrl VARCHAR(200)," +
" unmanagedApplication boolean," +
" `user` VARCHAR(20)," +
" vcoreSeconds BIGINT" +
")" +
" PARTITIONED BY (dt)" +
"WITH (" +
" 'connector' = 'hudi'," +
" 'path' = '"+ YARN_DATA_PATH +"'," +
" 'write.tasks' = '8'," +
" 'read.streaming.enabled'= 'true', " +
" 'table.type' = 'MERGE_ON_READ', " +
" 'read.streaming.check-interval' = '30'," +
" 'write.precombine.field' = 'dt'," +
" 'hoodie.datasource.write.operation' = 'insert'," +
" 'hoodie.datasource.write.recordkey.field' = 'id' " +
" )");
tableEnvironment.executeSql("insert into big_data_analyse_yarn
select * from " + dataDsYarn);
}
```
```
org.apache.kafka.common.errors.DisconnectException: null
2021-10-14 14:11:58,232 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141158]
2021-10-14 14:12:20,232 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141201]
2021-10-14 14:13:20,591 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141224]
2021-10-14 14:13:25,829 INFO org.apache.hudi.sink.compact.CompactFunction
[] - Executor executes action [Execute compaction for instant
20211014141222 from task 3] success!
2021-10-14 14:13:25,829 INFO org.apache.hudi.sink.compact.CompactFunction
[] - Executor executes action [Execute compaction for instant
20211014141222 from task 4] success!
2021-10-14 14:13:47,082 INFO org.apache.hudi.sink.compact.CompactFunction
[] - Executor executes action [Execute compaction for instant
20211014141222 from task 9] success!
2021-10-14 14:14:19,166 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141326]
2021-10-14 14:15:19,821 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141427]
2021-10-14 14:16:19,527 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141525]
2021-10-14 14:17:20,115 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141627]
2021-10-14 14:18:19,149 INFO org.apache.hudi.sink.StreamWriteFunction
[] - No data to write in subtask [3] for instant [20211014141723]
2021-10-14 14:18:22,363 ERROR org.apache.hudi.sink.compact.CompactFunction
[] - Executor executes action [Execute compaction for instant
20211014141722 from task 9] error
java.lang.NoClassDefFoundError:
org/apache/hadoop/mapreduce/lib/input/FileInputFormat
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_252]
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
~[?:1.8.0_252]
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
~[?:1.8.0_252]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
~[?:1.8.0_252]
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
~[?:1.8.0_252]
at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_252]
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_252]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_252]
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
~[?:1.8.0_252]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_252]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
~[?:1.8.0_252]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_252]
at
org.apache.hudi.org.apache.parquet.HadoopReadOptions$Builder.<init>(HadoopReadOptions.java:95)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.org.apache.parquet.HadoopReadOptions.builder(HadoopReadOptions.java:79)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.org.apache.parquet.hadoop.ParquetReader$Builder.<init>(ParquetReader.java:198)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.org.apache.parquet.avro.AvroParquetReader$Builder.<init>(AvroParquetReader.java:107)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.org.apache.parquet.avro.AvroParquetReader$Builder.<init>(AvroParquetReader.java:99)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.org.apache.parquet.avro.AvroParquetReader.builder(AvroParquetReader.java:48)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:65)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.table.action.commit.FlinkMergeHelper.runMerge(FlinkMergeHelper.java:89)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:338)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:329)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor.compact(HoodieFlinkMergeOnReadTableCompactor.java:146)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.table.action.compact.FlinkCompactHelpers.compact(FlinkCompactHelpers.java:95)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.sink.compact.CompactFunction.doCompaction(CompactFunction.java:101)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.sink.compact.CompactFunction.lambda$processElement$0(CompactFunction.java:91)
~[k-bdg-stream.jar:?]
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:67)
~[k-bdg-stream.jar:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_252]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_252]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
~[?:1.8.0_252]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_252]
... 30 more
2021-10-14 14:18:54,783 INFO org.apache.flink.runtime.taskmanager.Task
[] - Attempting to cancel task hoodie_stream_write (4/8)#0
(0f52575916f8c6ea1f60f8d7cc213bba).
2021-10-14 14:18:54,784 INFO org.apache.flink.runtime.taskmanager.Task
[] - hoodie_stream_write (4/8)#0
(0f52575916f8c6ea1f60f8d7cc213bba) switched from RUNNING to CANCELING.
2021-10-14 14:18:54,784 INFO org.apache.flink.runtime.taskmanager.Task
[] - Triggering cancellation of task code hoodie_stream_write
(4/8)#0 (0f52575916f8c6ea1f60f8d7cc213bba).
2021-10-14 14:18:54,786 INFO org.apache.flink.runtime.taskmanager.Task
[] - Attempting to cancel task hoodie_stream_write (5/8)#0
(40e056157180fbedb677e193c9f2f921).
```'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]