loukey-lj commented on code in PR #6602:
URL: https://github.com/apache/hudi/pull/6602#discussion_r963168938
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java:
##########
@@ -270,7 +270,7 @@ public long getCurrentSize() throws IOException {
if (output == null) {
return 0;
}
- return output.getPos();
+ return output.getPos() + logFile.getFileSize();
Review Comment:
@yihua When appending data to an old log
file,org.apache.hudi.common.table.log.HoodieLogFormatWriter#getOutputStream
postition always start at 0, after flush,
org.apache.hudi.common.table.log.HoodieLogFormatWriter#getCurrentSize returned
result is the size of the append dataset, not the total size of the entire
file. u can debug follow code.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000 * 20);
env.setParallelism(1);
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env);
final DataStreamSource<Tuple3<String, Long, Long>>
tuple3DataStreamSource = env.addSource(new SourceFunction<Tuple3<String, Long,
Long>>() {
@Override
public void run(SourceContext<Tuple3<String, Long, Long>> ctx)
throws Exception {
while (!Thread.interrupted()){
ctx.collect(new Tuple3<>("1",System.currentTimeMillis(),
System.currentTimeMillis()));
Thread.sleep(1000 * 10 );
}
}
@Override
public void cancel() {
}
});
tableEnvironment.createTemporaryView("s", tuple3DataStreamSource);
tableEnvironment.executeSql("\n" +
"\n" +
"create table if not exists h(\n" +
" `id` string PRIMARY KEY NOT ENFORCED , \n" +
" `ts` bigint , \n" +
" `time` bigint \n" +
") \n" +
"with (\n" +
"\t'connector' = 'hudi',\n
'write.bucket_assign.tasks'='1', " +
"\t'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.SimpleAvroKeyGenerator',\n"
+
"\t'table.type' = 'MERGE_ON_READ',\n" +
"\t'hive_sync.enable' = 'false',\n" +
"\t'write.tasks'='1',\n" +
"\t'path' = 'hdfs://xx',\n" +
"\t'hoodie.cleaner.commits.retained' = '1'\n" +
")\n");
tableEnvironment.executeSql("insert into h SELECT * from s \n");
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]