xmg333 opened a new issue, #9381:
URL: https://github.com/apache/hudi/issues/9381

   
   **Describe the problem you faced**
   
   Flink can only create the directory '.hoodie', and make several 'rollback' 
file in it. No data, logs, exception throw out.
   BUT the weird thing is the problem can only been reproduced on certain Kafka 
topic. So I do managed to write a table successfully before.
   
   I notice that here maybe the same issue: 
[https://github.com/apache/hudi/issues/3704](https://github.com/apache/hudi/issues/3704)
   I tried to set 'write.tasks' to 1 and the number of taskslots, as 
@Shraddhak28 mentioned. Nothing changed.
   
   **To Reproduce**
   
   Here is my code:
   `
       val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
         .setParallelism(1)
   
       val ds: DataStreamSource[String] = env.fromSource(
         KafkaSource.builder()
           .setBootstrapServers("master:9092")
           .setTopics("ChangeRecord")
           .setValueOnlyDeserializer(new SimpleStringSchema())
           .setStartingOffsets(OffsetsInitializer.earliest())
           .build(),
         WatermarkStrategy.noWatermarks().withIdleness(Duration.ofSeconds(1))
           .withTimestampAssigner(new SerializableTimestampAssigner[String] {
             override def extractTimestamp(t: String, l: Long): Long = 
System.currentTimeMillis()
           }), "kafka source"
       )
   
       val resDS: SingleOutputStreamOperator[(String, Int)] = ds.map(new 
MapFunction[String, (String, Int)] {
         override def map(value: String): (String, Int) = (value.slice(0,50), 1)
       })
   
       val tbName = "aggregation_tbl"
       val dbName = "result"
       val builder: HoodiePipeline.Builder = HoodiePipeline.builder(tbName)
         .column("data varchar(100)")
         .column("in_time varchar(10)")
         .pk("data")
         .partition("in_time")
         .option(FlinkOptions.TABLE_NAME.key(), tbName)
         .option(FlinkOptions.PATH.key(), "file:///opt/warehouse/" + dbName + 
".db/" + tbName)
   
   
       builder.sink(
         resDS.map(
           new MapFunction[(String, Int), RowData] {
             override def map(t: (String, Int)): RowData = {
               val rowData = new GenericRowData(2)
               rowData.setField(0, StringData.fromString(t._1))
               rowData.setField(1, StringData.fromString(t._2.toString))
               rowData
             }
           }), false
       )
       env.execute()
   `scala
   
   
   
   
   
   **Environment Description**
   
   * Hudi version :0.12.0,0.13.1
   
   * Flink version: 1.14.0,1.16.1
   
   * Spark version :3.1.1
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.1.3
   
   * Storage (HDFS/S3/GCS..) :LocalFS && HDFS
   
   * Running on Docker? (yes/no) :no
   
   
   **Additional context**
   
   I notice that on flink1.14.0 local mode, the write behavior seems stuck, 
because the 'sink' block on FLink WebUI looks always be busy and make back 
pressure, but didn't happen on newer version or Standalone cluster.
   this is Flink UI screenshot if this helped(flink1.16.1)
   
![flink](https://github.com/apache/hudi/assets/55586098/68bfcc7e-9ab0-4585-94c6-428fe3503305)
   
   
   **classpath and logs**
   
   ```/opt/jdk1.8.0_351/bin/java -cp 
/opt/jdk1.8.0_351/jre/lib/charsets.jar:/opt/jdk1.8.0_351/jre/lib/deploy.jar:/opt/jdk1.8.0_351/jre/lib/ext/cldrdata.jar:/opt/jdk1.8.0_351/jre/lib/ext/dnsns.jar:/opt/jdk1.8.0_351/jre/lib/ext/jaccess.jar:/opt/jdk1.8.0_351/jre/lib/ext/jfxrt.jar:/opt/jdk1.8.0_351/jre/lib/ext/localedata.jar:/opt/jdk1.8.0_351/jre/lib/ext/nashorn.jar:/opt/jdk1.8.0_351/jre/lib/ext/sunec.jar:/opt/jdk1.8.0_351/jre/lib/ext/sunjce_provider.jar:/opt/jdk1.8.0_351/jre/lib/ext/sunpkcs11.jar:/opt/jdk1.8.0_351/jre/lib/ext/zipfs.jar:/opt/jdk1.8.0_351/jre/lib/javaws.jar:/opt/jdk1.8.0_351/jre/lib/jce.jar:/opt/jdk1.8.0_351/jre/lib/jfr.jar:/opt/jdk1.8.0_351/jre/lib/jfxswt.jar:/opt/jdk1.8.0_351/jre/lib/jsse.jar:/opt/jdk1.8.0_351/jre/lib/management-agent.jar:/opt/jdk1.8.0_351/jre/lib/plugin.jar:/opt/jdk1.8.0_351/jre/lib/resources.jar:/opt/jdk1.8.0_351/jre/lib/rt.jar:/home/xmg/IdeaProjects/flink-hudi-1.16.1/target/classes:/home/xmg/.m2/repository/org/apache/flink/flink-core/1.16.1/flink-cor
 
e-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-annotations/1.16.1/flink-annotations-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-metrics-core/1.16.1/flink-metrics-core-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-shaded-asm-9/9.2-15.0/flink-shaded-asm-9-9.2-15.0.jar:/home/xmg/.m2/repository/org/apache/flink/flink-shaded-jackson/2.12.4-15.0/flink-shaded-jackson-2.12.4-15.0.jar:/home/xmg/.m2/repository/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/home/xmg/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/home/xmg/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/home/xmg/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/home/xmg/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/xmg/.m2/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/home/xmg/.m2/repository/org/apache/flink/flink-shaded-guava/30.1.
 
1-jre-15.0/flink-shaded-guava-30.1.1-jre-15.0.jar:/home/xmg/.m2/repository/org/slf4j/slf4j-api/1.7.32/slf4j-api-1.7.32.jar:/home/xmg/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/xmg/.m2/repository/org/apache/flink/flink-shaded-force-shading/15.0/flink-shaded-force-shading-15.0.jar:/home/xmg/.m2/repository/org/apache/flink/flink-streaming-java/1.16.1/flink-streaming-java-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-file-sink-common/1.16.1/flink-file-sink-common-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-runtime/1.16.1/flink-runtime-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-rpc-core/1.16.1/flink-rpc-core-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.16.1/flink-rpc-akka-loader-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-queryable-state-client-java/1.16.1/flink-queryable-state-client-java-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-hadoop-fs/1.16.1/f
 
link-hadoop-fs-1.16.1.jar:/home/xmg/.m2/repository/commons-io/commons-io/2.11.0/commons-io-2.11.0.jar:/home/xmg/.m2/repository/org/apache/flink/flink-shaded-zookeeper-3/3.5.9-15.0/flink-shaded-zookeeper-3-3.5.9-15.0.jar:/home/xmg/.m2/repository/org/javassist/javassist/3.24.0-GA/javassist-3.24.0-GA.jar:/home/xmg/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.3/snappy-java-1.1.8.3.jar:/home/xmg/.m2/repository/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar:/home/xmg/.m2/repository/org/apache/flink/flink-java/1.16.1/flink-java-1.16.1.jar:/home/xmg/.m2/repository/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/home/xmg/.m2/repository/org/apache/commons/commons-math3/3.6.1/commons-math3-3.6.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-table-api-java-bridge/1.16.1/flink-table-api-java-bridge-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-table-api-java/1.16.1/flink-table-api-java-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-table-common/1.16.1/flink-tab
 
le-common-1.16.1.jar:/home/xmg/.m2/repository/com/ibm/icu/icu4j/67.1/icu4j-67.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-table-api-bridge-base/1.16.1/flink-table-api-bridge-base-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-connector-kafka/1.16.1/flink-connector-kafka-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-connector-base/1.16.1/flink-connector-base-1.16.1.jar:/home/xmg/.m2/repository/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar:/home/xmg/.m2/repository/com/github/luben/zstd-jni/1.5.2-1/zstd-jni-1.5.2-1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-table-planner_2.12/1.16.1/flink-table-planner_2.12-1.16.1.jar:/home/xmg/.m2/repository/org/codehaus/janino/commons-compiler/3.0.11/commons-compiler-3.0.11.jar:/home/xmg/.m2/repository/org/codehaus/janino/janino/3.0.11/janino-3.0.11.jar:/home/xmg/.m2/repository/org/apache/flink/flink-scala_2.12/1.16.1/flink-scala_2.12-1.16.1.jar:/home/xmg/.m2/repository/org/scala-lang/scala-r
 
eflect/2.12.7/scala-reflect-2.12.7.jar:/home/xmg/.m2/repository/org/scala-lang/scala-library/2.12.7/scala-library-2.12.7.jar:/home/xmg/.m2/repository/org/scala-lang/scala-compiler/2.12.7/scala-compiler-2.12.7.jar:/home/xmg/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/home/xmg/.m2/repository/com/twitter/chill_2.12/0.7.6/chill_2.12-0.7.6.jar:/home/xmg/.m2/repository/org/apache/flink/flink-table-runtime/1.16.1/flink-table-runtime-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-cep/1.16.1/flink-cep-1.16.1.jar:/home/xmg/.m2/repository/org/apache/hudi/hudi-flink1.16-bundle/0.13.1/hudi-flink1.16-bundle-0.13.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-clients/1.16.1/flink-clients-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-optimizer/1.16.1/flink-optimizer-1.16.1.jar:/home/xmg/.m2/repository/commons-cli/commons-cli/1.5.0/commons-cli-1.5.0.jar:/home/xmg/.m2/repository/org/apache/flink/flink-runtime-web/1.16.1/flink-ru
 
ntime-web-1.16.1.jar:/home/xmg/.m2/repository/org/apache/flink/flink-shaded-netty/4.1.70.Final-15.0/flink-shaded-netty-4.1.70.Final-15.0.jar:/home/xmg/.m2/repository/org/apache/logging/log4j/log4j-1.2-api/2.17.2/log4j-1.2-api-2.17.2.jar:/home/xmg/.m2/repository/org/apache/logging/log4j/log4j-api/2.17.2/log4j-api-2.17.2.jar:/home/xmg/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.17.2/log4j-slf4j-impl-2.17.2.jar:/home/xmg/.m2/repository/org/apache/logging/log4j/log4j-core/2.17.2/log4j-core-2.17.2.jar:/opt/hadoop-3.1.3/etc/hadoop:/opt/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/hadoop-3.1.3/share/hadoop/common/*:/opt/hadoop-3.1.3/share/hadoop/hdfs:/opt/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.3/share/hadoop/yarn/*
 
-javaagent:/opt/idea-IC-223.8617.56/lib/idea_rt.jar=42397:/opt/idea-IC-223.8617.5
 6/bin -Dfile.encoding=UTF-8 icu.wuhufly.Code
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in 
[jar:file:/home/xmg/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.17.2/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.
   SLF4J: Actual binding is of type 
[org.apache.logging.slf4j.Log4jLoggerFactory]
   2023-08-07 21:27:45 WARN  KafkaSourceBuilder:449 - Offset commit on 
checkpoint is disabled because group.id is not specified
   2023-08-07 21:27:45 INFO  TypeExtractor:2107 - Field Tuple2#_2 will be 
processed as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2023-08-07 21:27:45 INFO  TypeExtractor:2155 - class scala.Tuple2 is missing 
a default constructor so it cannot be used as a POJO type and must be processed 
as GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance and schema evolution.
   2023-08-07 21:27:46 WARN  NativeCodeLoader:60 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   2023-08-07 21:27:46 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:46 INFO  TypeExtractor:2036 - class 
org.apache.hudi.common.model.HoodieRecord does not contain a setter for field 
key
   2023-08-07 21:27:46 INFO  TypeExtractor:2079 - Class class 
org.apache.hudi.common.model.HoodieRecord cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance and schema evolution.
   2023-08-07 21:27:46 INFO  TypeExtractor:2036 - class 
org.apache.hudi.common.model.HoodieRecord does not contain a setter for field 
key
   2023-08-07 21:27:46 INFO  TypeExtractor:2079 - Class class 
org.apache.hudi.common.model.HoodieRecord cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance and schema evolution.
   2023-08-07 21:27:46 INFO  TaskExecutorResourceUtils:281 - The configuration 
option taskmanager.cpu.cores required for local execution is not set, setting 
it to the maximal possible value.
   2023-08-07 21:27:46 INFO  TaskExecutorResourceUtils:281 - The configuration 
option taskmanager.memory.task.heap.size required for local execution is not 
set, setting it to the maximal possible value.
   2023-08-07 21:27:46 INFO  TaskExecutorResourceUtils:281 - The configuration 
option taskmanager.memory.task.off-heap.size required for local execution is 
not set, setting it to the maximal possible value.
   2023-08-07 21:27:46 INFO  TaskExecutorResourceUtils:281 - The configuration 
option taskmanager.memory.network.min required for local execution is not set, 
setting it to its default value 64 mb.
   2023-08-07 21:27:46 INFO  TaskExecutorResourceUtils:281 - The configuration 
option taskmanager.memory.network.max required for local execution is not set, 
setting it to its default value 64 mb.
   2023-08-07 21:27:46 INFO  TaskExecutorResourceUtils:281 - The configuration 
option taskmanager.memory.managed.size required for local execution is not set, 
setting it to its default value 128 mb.
   2023-08-07 21:27:46 INFO  MiniCluster:318 - Starting Flink Mini Cluster
   2023-08-07 21:27:46 INFO  MiniCluster:337 - Starting Metrics Registry
   2023-08-07 21:27:46 INFO  MetricRegistryImpl:129 - No metrics reporter 
configured, no metrics will be exposed/reported.
   2023-08-07 21:27:46 INFO  MiniCluster:344 - Starting RPC Service(s)
   2023-08-07 21:27:46 INFO  AkkaRpcServiceUtils:225 - Trying to start local 
actor system
   2023-08-07 21:27:47 INFO  Slf4jLogger:105 - Slf4jLogger started
   2023-08-07 21:27:47 INFO  AkkaRpcServiceUtils:255 - Actor system started at 
akka://flink
   2023-08-07 21:27:47 INFO  AkkaRpcServiceUtils:225 - Trying to start local 
actor system
   2023-08-07 21:27:47 INFO  Slf4jLogger:105 - Slf4jLogger started
   2023-08-07 21:27:47 INFO  AkkaRpcServiceUtils:255 - Actor system started at 
akka://flink-metrics
   2023-08-07 21:27:47 INFO  AkkaRpcService:262 - Starting RPC endpoint for 
org.apache.flink.runtime.metrics.dump.MetricQueryService at 
akka://flink-metrics/user/rpc/MetricQueryService .
   2023-08-07 21:27:47 INFO  BlobServer:164 - Created BLOB server storage 
directory /tmp/minicluster_2b00619b292b0e451240a81575316fdd/blobStorage
   2023-08-07 21:27:47 INFO  BlobServer:238 - Started BLOB server at 
0.0.0.0:32917 - max concurrent requests: 50 - max backlog: 1000
   2023-08-07 21:27:47 INFO  KerberosDelegationTokenManagerFactory:55 - Cannot 
use kerberos delegation token manager no valid kerberos credentials provided.
   2023-08-07 21:27:47 INFO  PermanentBlobCache:93 - Created BLOB cache storage 
directory /tmp/minicluster_2b00619b292b0e451240a81575316fdd/blobStorage
   2023-08-07 21:27:47 INFO  TransientBlobCache:93 - Created BLOB cache storage 
directory /tmp/minicluster_2b00619b292b0e451240a81575316fdd/blobStorage
   2023-08-07 21:27:47 INFO  MiniCluster:721 - Starting 1 TaskManager(s)
   2023-08-07 21:27:47 INFO  TaskManagerRunner:594 - Starting TaskManager with 
ResourceID: 1f195501-22f5-425d-9ebc-bd82a1bd3498
   2023-08-07 21:27:47 INFO  TaskManagerServices:466 - Temporary file directory 
'/tmp': total 228 GB, usable 60 GB (26.32% usable)
   2023-08-07 21:27:47 INFO  IOManager:60 - Created a new FileChannelManager 
for spilling of task related data to disk (joins, sorting, ...). Used 
directories:
        /tmp/flink-io-b15779ed-8264-4bff-85de-e03b8c910869
   2023-08-07 21:27:47 INFO  NettyShuffleServiceFactory:161 - Created a new 
FileChannelManager for storing result partitions of BLOCKING shuffles. Used 
directories:
        /tmp/flink-netty-shuffle-1a0c8676-6fc0-4e2c-9d87-2809cb0132dc
   2023-08-07 21:27:47 INFO  NetworkBufferPool:156 - Allocated 64 MB for 
network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
   2023-08-07 21:27:47 INFO  NettyShuffleEnvironment:353 - Starting the network 
environment and its components.
   2023-08-07 21:27:47 INFO  KvStateService:92 - Starting the kvState service 
and its components.
   2023-08-07 21:27:47 INFO  Configuration:824 - Config uses fallback 
configuration key 'akka.ask.timeout' instead of key 'taskmanager.slot.timeout'
   2023-08-07 21:27:47 INFO  AkkaRpcService:262 - Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/rpc/taskmanager_0 .
   2023-08-07 21:27:47 INFO  DefaultJobLeaderService:127 - Start job leader 
service.
   2023-08-07 21:27:47 INFO  FileCache:116 - User file cache uses directory 
/tmp/flink-dist-cache-f070c8f1-4968-45d6-8ba5-162d92471240
   2023-08-07 21:27:47 INFO  DispatcherRestEndpoint:175 - Starting rest 
endpoint.
   2023-08-07 21:27:47 WARN  WebMonitorUtils:76 - Log file environment variable 
'log.file' is not set.
   2023-08-07 21:27:47 WARN  WebMonitorUtils:82 - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
   2023-08-07 21:27:47 INFO  DispatcherRestEndpoint:298 - Rest endpoint 
listening at localhost:33597
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:308 - Proposing leadership 
to contender http://localhost:33597
   2023-08-07 21:27:47 INFO  DispatcherRestEndpoint:1008 - Web frontend 
listening at http://localhost:33597.
   2023-08-07 21:27:47 INFO  DispatcherRestEndpoint:1066 - 
http://localhost:33597 was granted leadership with 
leaderSessionID=c9466bb7-c520-4e89-9bb1-e30bca181b7b
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:256 - Received confirmation 
of leadership for leader http://localhost:33597 , 
session=c9466bb7-c520-4e89-9bb1-e30bca181b7b
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:308 - Proposing leadership 
to contender LeaderContender: DefaultDispatcherRunner
   2023-08-07 21:27:47 INFO  ResourceManagerServiceImpl:119 - Starting resource 
manager service.
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:308 - Proposing leadership 
to contender LeaderContender: ResourceManagerServiceImpl
   2023-08-07 21:27:47 INFO  DefaultDispatcherRunner:107 - 
DefaultDispatcherRunner was granted leadership with leader id 
9b63f9ae-9e22-45db-b8d9-5707ac05122a. Creating new DispatcherLeaderProcess.
   2023-08-07 21:27:47 INFO  ResourceManagerServiceImpl:199 - Resource manager 
service is granted leadership with session id 
a9bd98dd-0afe-4783-892b-2cda53dc3238.
   2023-08-07 21:27:47 INFO  MiniCluster:496 - Flink Mini Cluster started 
successfully
   2023-08-07 21:27:47 INFO  SessionDispatcherLeaderProcess:99 - Start 
SessionDispatcherLeaderProcess.
   2023-08-07 21:27:47 INFO  SessionDispatcherLeaderProcess:144 - Recover all 
persisted job graphs that are not finished, yet.
   2023-08-07 21:27:47 INFO  SessionDispatcherLeaderProcess:158 - Successfully 
recovered 0 persisted job graphs.
   2023-08-07 21:27:47 INFO  AkkaRpcService:262 - Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/rpc/resourcemanager_1 .
   2023-08-07 21:27:47 INFO  AkkaRpcService:262 - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_2 .
   2023-08-07 21:27:47 INFO  StandaloneResourceManager:240 - Starting the 
resource manager.
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:256 - Received confirmation 
of leadership for leader akka://flink/user/rpc/resourcemanager_1 , 
session=a9bd98dd-0afe-4783-892b-2cda53dc3238
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:256 - Received confirmation 
of leadership for leader akka://flink/user/rpc/dispatcher_2 , 
session=9b63f9ae-9e22-45db-b8d9-5707ac05122a
   2023-08-07 21:27:47 INFO  TaskExecutor:1361 - Connecting to ResourceManager 
akka://flink/user/rpc/resourcemanager_1(892b2cda53dc3238a9bd98dd0afe4783).
   2023-08-07 21:27:47 INFO  TaskExecutor:162 - Resolved ResourceManager 
address, beginning registration
   2023-08-07 21:27:47 INFO  StandaloneResourceManager:1004 - Registering 
TaskManager with ResourceID 1f195501-22f5-425d-9ebc-bd82a1bd3498 
(akka://flink/user/rpc/taskmanager_0) at ResourceManager
   2023-08-07 21:27:47 INFO  TaskExecutor:99 - Successful registration at 
resource manager akka://flink/user/rpc/resourcemanager_1 under registration id 
6fc2b822292b1854af2eee817e753273.
   2023-08-07 21:27:47 INFO  StandaloneDispatcher:431 - Received JobGraph 
submission 'Flink Streaming Job' (29851d6b0633d89bf05b73ad96dbbf26).
   2023-08-07 21:27:47 INFO  StandaloneDispatcher:531 - Submitting job 'Flink 
Streaming Job' (29851d6b0633d89bf05b73ad96dbbf26).
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:308 - Proposing leadership 
to contender LeaderContender: JobMasterServiceLeadershipRunner
   2023-08-07 21:27:47 INFO  AkkaRpcService:262 - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_3 .
   2023-08-07 21:27:47 INFO  JobMaster:312 - Initializing job 'Flink Streaming 
Job' (29851d6b0633d89bf05b73ad96dbbf26).
   2023-08-07 21:27:47 INFO  JobMaster:103 - Using restart back off time 
strategy NoRestartBackoffTimeStrategy for Flink Streaming Job 
(29851d6b0633d89bf05b73ad96dbbf26).
   2023-08-07 21:27:47 INFO  ExecutionGraph:393 - Created execution graph 
6844b5c34e75728cf010a084a584dd3b for job 29851d6b0633d89bf05b73ad96dbbf26.
   2023-08-07 21:27:47 INFO  JobMaster:161 - Running initialization on master 
for job Flink Streaming Job (29851d6b0633d89bf05b73ad96dbbf26).
   2023-08-07 21:27:47 INFO  JobMaster:190 - Successfully ran initialization on 
master in 0 ms.
   2023-08-07 21:27:47 INFO  DefaultExecutionTopology:415 - Built 1 new 
pipelined regions in 0 ms, total 1 pipelined regions currently.
   2023-08-07 21:27:47 INFO  JobMaster:263 - No state backend has been 
configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3348feed
   2023-08-07 21:27:47 INFO  StateBackendLoader:321 - State backend loader 
loads the state backend as HashMapStateBackend
   2023-08-07 21:27:47 INFO  JobMaster:274 - Checkpoint storage is set to 
'jobmanager'
   2023-08-07 21:27:47 INFO  CheckpointCoordinator:1655 - No checkpoint found 
during restore.
   2023-08-07 21:27:47 INFO  JobMaster:162 - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@93bcfa5
 for Flink Streaming Job (29851d6b0633d89bf05b73ad96dbbf26).
   2023-08-07 21:27:47 INFO  EmbeddedLeaderService:256 - Received confirmation 
of leadership for leader akka://flink/user/rpc/jobmanager_3 , 
session=91f9564a-d15f-4349-887d-11f85dee9394
   2023-08-07 21:27:47 INFO  JobMaster:957 - Starting execution of job 'Flink 
Streaming Job' (29851d6b0633d89bf05b73ad96dbbf26) under job master id 
887d11f85dee939491f9564ad15f4349.
   2023-08-07 21:27:47 INFO  StreamerUtil:209 - Table 
[file:///opt/warehouse/result.db/aggregation_tbl/aggregation_tbl] already 
exists, no need to initialize the table
   2023-08-07 21:27:47 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:47 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:47 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  EmbeddedTimelineService:67 - Starting Timeline 
service !!
   2023-08-07 21:27:48 WARN  EmbeddedTimelineService:114 - Unable to find 
driver bind address from spark config
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :MEMORY
   2023-08-07 21:27:48 INFO  FileSystemViewManager:257 - Creating in-memory 
based Table View
   2023-08-07 21:27:48 INFO  log:170 - Logging initialized @3262ms to 
org.apache.hudi.org.apache.jetty.util.log.Slf4jLog
   2023-08-07 21:27:48 INFO  Javalin:22 - 
          __                      __ _            __ __
         / /____ _ _   __ ____ _ / /(_)____      / // /
    __  / // __ `/| | / // __ `// // // __ \    / // /_
   / /_/ // /_/ / | |/ // /_/ // // // / / /   /__  __/
   \____/ \__,_/  |___/ \__,_//_//_//_/ /_/      /_/
   
             https://javalin.io/documentation
   
   2023-08-07 21:27:48 INFO  Javalin:17 - Starting Javalin ...
   2023-08-07 21:27:48 INFO  Javalin:17 - You are running Javalin 4.6.7 
(released October 24, 2022. Your Javalin version is 287 days old. Consider 
checking for a newer version.).
   2023-08-07 21:27:48 INFO  Server:375 - jetty-9.4.48.v20220622; built: 
2022-06-21T20:42:25.880Z; git: 6b67c5719d1f4371b33655ff2d047d24e171e49a; jvm 
1.8.0_351-b10
   2023-08-07 21:27:48 INFO  Server:415 - Started @3432ms
   2023-08-07 21:27:48 INFO  Javalin:17 - Listening on http://localhost:38467/
   2023-08-07 21:27:48 INFO  Javalin:17 - Javalin started in 81ms \o/
   2023-08-07 21:27:48 INFO  TimelineService:363 - Starting Timeline server on 
port :38467
   2023-08-07 21:27:48 INFO  EmbeddedTimelineService:106 - Started embedded 
timeline server at 192.168.8.233:38467
   2023-08-07 21:27:48 INFO  BaseHoodieClient:133 - Timeline Server already 
running. Not restarting the service
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:152 - Loading Active commit 
timeline for file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :REMOTE_FIRST
   2023-08-07 21:27:48 INFO  FileSystemViewManager:265 - Creating remote first 
table view
   2023-08-07 21:27:48 INFO  SourceCoordinator:197 - Starting split enumerator 
for source Source: kafka source.
   2023-08-07 21:27:48 INFO  JobMaster:210 - Starting scheduling with 
scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
   2023-08-07 21:27:48 INFO  ExecutionGraph:1152 - Job Flink Streaming Job 
(29851d6b0633d89bf05b73ad96dbbf26) switched from state CREATED to RUNNING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - Source: kafka source -> Map 
-> Map -> row_data_to_hoodie_record (1/1) 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from CREATED to SCHEDULED.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - bucket_assigner (1/1) 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from CREATED to SCHEDULED.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - stream_write: 
aggregation_tbl -> Sink: clean_commits (1/1) 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from CREATED to SCHEDULED.
   2023-08-07 21:27:48 INFO  JobMaster:1152 - Connecting to ResourceManager 
akka://flink/user/rpc/resourcemanager_1(892b2cda53dc3238a9bd98dd0afe4783)
   2023-08-07 21:27:48 INFO  JobMaster:162 - Resolved ResourceManager address, 
beginning registration
   2023-08-07 21:27:48 INFO  StandaloneResourceManager:376 - Registering job 
manager 887d11f85dee939491f9564ad15f4349@akka://flink/user/rpc/jobmanager_3 for 
job 29851d6b0633d89bf05b73ad96dbbf26.
   2023-08-07 21:27:48 INFO  StandaloneResourceManager:941 - Registered job 
manager 887d11f85dee939491f9564ad15f4349@akka://flink/user/rpc/jobmanager_3 for 
job 29851d6b0633d89bf05b73ad96dbbf26.
   2023-08-07 21:27:48 INFO  JobMaster:1176 - JobManager successfully 
registered at ResourceManager, leader id: 892b2cda53dc3238a9bd98dd0afe4783.
   2023-08-07 21:27:48 INFO  DeclarativeSlotManager:294 - Received resource 
requirements from job 29851d6b0633d89bf05b73ad96dbbf26: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
   2023-08-07 21:27:48 INFO  KafkaSourceEnumerator:164 - Starting the 
KafkaSourceEnumerator for consumer group null without periodic partition 
discovery.
   2023-08-07 21:27:48 INFO  TaskExecutor:1084 - Receive slot request 
4b046c9bec4a797c54e4979f994339aa for job 29851d6b0633d89bf05b73ad96dbbf26 from 
resource manager with leader id 892b2cda53dc3238a9bd98dd0afe4783.
   2023-08-07 21:27:48 INFO  TaskExecutor:1179 - Allocated slot for 
4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  DefaultJobLeaderService:192 - Add job 
29851d6b0633d89bf05b73ad96dbbf26 for job leader monitoring.
   2023-08-07 21:27:48 INFO  DefaultJobLeaderService:350 - Try to register at 
job manager akka://flink/user/rpc/jobmanager_3 with leader id 
91f9564a-d15f-4349-887d-11f85dee9394.
   2023-08-07 21:27:48 INFO  DefaultJobLeaderService:162 - Resolved JobManager 
address, beginning registration
   2023-08-07 21:27:48 INFO  DefaultJobLeaderService:417 - Successful 
registration at job manager akka://flink/user/rpc/jobmanager_3 for job 
29851d6b0633d89bf05b73ad96dbbf26.
   2023-08-07 21:27:48 INFO  TaskExecutor:1665 - Establish JobManager 
connection for job 29851d6b0633d89bf05b73ad96dbbf26.
   2023-08-07 21:27:48 INFO  TaskExecutor:1516 - Offer reserved slots to the 
leader of job 29851d6b0633d89bf05b73ad96dbbf26.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - Source: kafka source -> Map 
-> Map -> row_data_to_hoodie_record (1/1) 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from SCHEDULED to DEPLOYING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:561 - Deploying Source: kafka 
source -> Map -> Map -> row_data_to_hoodie_record (1/1) (attempt #0) with 
attempt id 
6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and 
vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 
1f195501-22f5-425d-9ebc-bd82a1bd3498 @ localhost (dataPort=-1) with allocation 
id 4b046c9bec4a797c54e4979f994339aa
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - bucket_assigner (1/1) 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from SCHEDULED to DEPLOYING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:561 - Deploying bucket_assigner 
(1/1) (attempt #0) with attempt id 
6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0 and 
vertex id 53bbce12f8152caf7f43137342f64593_0 to 
1f195501-22f5-425d-9ebc-bd82a1bd3498 @ localhost (dataPort=-1) with allocation 
id 4b046c9bec4a797c54e4979f994339aa
   2023-08-07 21:27:48 INFO  TaskSlotTableImpl:388 - Activate slot 
4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - stream_write: 
aggregation_tbl -> Sink: clean_commits (1/1) 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from SCHEDULED to DEPLOYING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:561 - Deploying stream_write: 
aggregation_tbl -> Sink: clean_commits (1/1) (attempt #0) with attempt id 
6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0 and 
vertex id d04a2ee1f26fe7f52eb5b5e2df59b47e_0 to 
1f195501-22f5-425d-9ebc-bd82a1bd3498 @ localhost (dataPort=-1) with allocation 
id 4b046c9bec4a797c54e4979f994339aa
   2023-08-07 21:27:48 INFO  StateChangelogStorageLoader:84 - 
StateChangelogStorageLoader initialized with shortcut names {memory}.
   2023-08-07 21:27:48 INFO  StateChangelogStorageLoader:106 - Creating a 
changelog storage with name 'memory'.
   2023-08-07 21:27:48 INFO  TaskExecutor:760 - Received task Source: kafka 
source -> Map -> Map -> row_data_to_hoodie_record (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0), deploy 
into slot with allocation id 4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  Task:1067 - Source: kafka source -> Map -> Map -> 
row_data_to_hoodie_record (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from CREATED to DEPLOYING.
   2023-08-07 21:27:48 INFO  TaskSlotTableImpl:388 - Activate slot 
4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  Task:610 - Loading JAR files for task Source: 
kafka source -> Map -> Map -> row_data_to_hoodie_record (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
[DEPLOYING].
   2023-08-07 21:27:48 INFO  TaskExecutor:760 - Received task bucket_assigner 
(1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0), deploy 
into slot with allocation id 4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  Task:1067 - bucket_assigner (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from CREATED to DEPLOYING.
   2023-08-07 21:27:48 INFO  Task:610 - Loading JAR files for task 
bucket_assigner (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
[DEPLOYING].
   2023-08-07 21:27:48 INFO  TaskSlotTableImpl:388 - Activate slot 
4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  TaskSlotTableImpl:388 - Activate slot 
4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  TaskExecutor:760 - Received task stream_write: 
aggregation_tbl -> Sink: clean_commits (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0), deploy 
into slot with allocation id 4b046c9bec4a797c54e4979f994339aa.
   2023-08-07 21:27:48 INFO  Task:1067 - stream_write: aggregation_tbl -> Sink: 
clean_commits (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from CREATED to DEPLOYING.
   2023-08-07 21:27:48 INFO  Task:610 - Loading JAR files for task 
stream_write: aggregation_tbl -> Sink: clean_commits (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
[DEPLOYING].
   2023-08-07 21:27:48 INFO  StreamTask:263 - No state backend has been 
configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@25a6d880
   2023-08-07 21:27:48 INFO  StateBackendLoader:321 - State backend loader 
loads the state backend as HashMapStateBackend
   2023-08-07 21:27:48 INFO  StreamTask:274 - Checkpoint storage is set to 
'jobmanager'
   2023-08-07 21:27:48 INFO  StreamTask:263 - No state backend has been 
configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@fdcd039
   2023-08-07 21:27:48 INFO  StreamTask:263 - No state backend has been 
configured, using default (HashMap) 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1df9465
   2023-08-07 21:27:48 INFO  StateBackendLoader:321 - State backend loader 
loads the state backend as HashMapStateBackend
   2023-08-07 21:27:48 INFO  StateBackendLoader:321 - State backend loader 
loads the state backend as HashMapStateBackend
   2023-08-07 21:27:48 INFO  StreamTask:274 - Checkpoint storage is set to 
'jobmanager'
   2023-08-07 21:27:48 INFO  StreamTask:274 - Checkpoint storage is set to 
'jobmanager'
   2023-08-07 21:27:48 INFO  Task:1067 - Source: kafka source -> Map -> Map -> 
row_data_to_hoodie_record (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from DEPLOYING to INITIALIZING.
   2023-08-07 21:27:48 INFO  Task:1067 - stream_write: aggregation_tbl -> Sink: 
clean_commits (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from DEPLOYING to INITIALIZING.
   2023-08-07 21:27:48 INFO  Task:1067 - bucket_assigner (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from DEPLOYING to INITIALIZING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - Source: kafka source -> Map 
-> Map -> row_data_to_hoodie_record (1/1) 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from DEPLOYING to INITIALIZING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - bucket_assigner (1/1) 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from DEPLOYING to INITIALIZING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - stream_write: 
aggregation_tbl -> Sink: clean_commits (1/1) 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from DEPLOYING to INITIALIZING.
   2023-08-07 21:27:48 WARN  KafkaSourceReader:79 - Offset commit on checkpoint 
is disabled. Consuming offset will not be reported back to Kafka cluster.
   2023-08-07 21:27:48 INFO  HeapKeyedStateBackendBuilder:170 - Finished to 
build heap keyed state-backend.
   2023-08-07 21:27:48 INFO  HeapKeyedStateBackend:175 - Initializing heap 
keyed state backend with stream factory.
   2023-08-07 21:27:48 INFO  SourceCoordinator:578 - Source Source: kafka 
source registering reader for parallel task 0 (#0) @ 
   2023-08-07 21:27:48 INFO  Task:1067 - Source: kafka source -> Map -> Map -> 
row_data_to_hoodie_record (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from INITIALIZING to RUNNING.
   2023-08-07 21:27:48 INFO  ViewStorageProperties:69 - Loading filesystem view 
storage properties from 
file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/.aux/view_storage_conf.properties
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - Source: kafka source -> Map 
-> Map -> row_data_to_hoodie_record (1/1) 
(6844b5c34e75728cf010a084a584dd3b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from INITIALIZING to RUNNING.
   2023-08-07 21:27:48 INFO  ViewStorageProperties:69 - Loading filesystem view 
storage properties from 
file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/.aux/view_storage_conf.properties
   2023-08-07 21:27:48 INFO  BaseHoodieClient:136 - Embedded Timeline Server is 
disabled. Not starting timeline service
   2023-08-07 21:27:48 INFO  BaseHoodieClient:136 - Embedded Timeline Server is 
disabled. Not starting timeline service
   2023-08-07 21:27:48 INFO  CleanFunction:66 - exec clean with instant time 
20230807212748465...
   2023-08-07 21:27:48 INFO  HeapKeyedStateBackendBuilder:170 - Finished to 
build heap keyed state-backend.
   2023-08-07 21:27:48 INFO  HeapKeyedStateBackend:175 - Initializing heap 
keyed state backend with stream factory.
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:152 - Loading Active commit 
timeline for file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :REMOTE_FIRST
   2023-08-07 21:27:48 INFO  FileSystemViewManager:265 - Creating remote first 
table view
   2023-08-07 21:27:48 INFO  BaseHoodieWriteClient:543 - Cleaner started
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:152 - Loading Active commit 
timeline for file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:152 - Loading Active commit 
timeline for file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :REMOTE_FIRST
   2023-08-07 21:27:48 INFO  FileSystemViewManager:265 - Creating remote first 
table view
   2023-08-07 21:27:48 INFO  BaseHoodieWriteClient:431 - Scheduling cleaning at 
instant time :20230807212748465
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :REMOTE_FIRST
   2023-08-07 21:27:48 INFO  FileSystemViewManager:265 - Creating remote first 
table view
   2023-08-07 21:27:48 INFO  FileSystemViewManager:214 - Creating remote view 
for basePath file:/opt/warehouse/result.db/aggregation_tbl. 
Server=192.168.8.233:38467, Timeout=300
   2023-08-07 21:27:48 INFO  FileSystemViewManager:165 - Creating InMemory 
based view for basePath file:/opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  AbstractTableFileSystemView:258 - Took 2 ms to 
read  0 instants, 0 replaced file groups
   2023-08-07 21:27:48 INFO  CleanFunction:131 - Executor executes action [wait 
for cleaning finish] success!
   2023-08-07 21:27:48 INFO  ViewStorageProperties:69 - Loading filesystem view 
storage properties from 
file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/.aux/view_storage_conf.properties
   2023-08-07 21:27:48 INFO  BaseHoodieClient:136 - Embedded Timeline Server is 
disabled. Not starting timeline service
   2023-08-07 21:27:48 INFO  BaseHoodieClient:136 - Embedded Timeline Server is 
disabled. Not starting timeline service
   2023-08-07 21:27:48 INFO  ClusteringUtils:140 - Found 0 files in pending 
clustering operations
   2023-08-07 21:27:48 INFO  TypeExtractor:2107 - Field 
WriteMetadataEvent#writeStatuses will be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  WriteProfile:170 - Refresh average bytes per 
record => 1024
   2023-08-07 21:27:48 INFO  AbstractStreamWriteFunction:228 - Send bootstrap 
write metadata event to coordinator, task[0].
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  StreamWriteFunction:204 - init hoodie merge with 
class [org.apache.hudi.common.model.HoodieAvroRecordMerger]
   2023-08-07 21:27:48 INFO  Task:1067 - bucket_assigner (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from INITIALIZING to RUNNING.
   2023-08-07 21:27:48 INFO  Task:1067 - stream_write: aggregation_tbl -> Sink: 
clean_commits (1/1)#0 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from INITIALIZING to RUNNING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - bucket_assigner (1/1) 
(6844b5c34e75728cf010a084a584dd3b_53bbce12f8152caf7f43137342f64593_0_0) 
switched from INITIALIZING to RUNNING.
   2023-08-07 21:27:48 INFO  ExecutionGraph:1435 - stream_write: 
aggregation_tbl -> Sink: clean_commits (1/1) 
(6844b5c34e75728cf010a084a584dd3b_d04a2ee1f26fe7f52eb5b5e2df59b47e_0_0) 
switched from INITIALIZING to RUNNING.
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  StreamWriteOperatorCoordinator:131 - Executor 
executes action [handle write metadata event for instant ] success!
   2023-08-07 21:27:48 INFO  CleanerUtils:155 - Cleaned failed attempts if any
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:152 - Loading Active commit 
timeline for file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :REMOTE_FIRST
   2023-08-07 21:27:48 INFO  FileSystemViewManager:265 - Creating remote first 
table view
   2023-08-07 21:27:48 INFO  BaseHoodieWriteClient:787 - Begin rollback of 
instant 20230807211441479
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:130 - Loading 
HoodieTableMetaClient from file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableConfig:268 - Loading table properties 
from file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/hoodie.properties
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:149 - Finished Loading Table 
of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieTableMetaClient:152 - Loading Active commit 
timeline for file:///opt/warehouse/result.db/aggregation_tbl
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807211441479__commit__INFLIGHT]}
   2023-08-07 21:27:48 INFO  FileSystemViewManager:245 - Creating View Manager 
with storage type :REMOTE_FIRST
   2023-08-07 21:27:48 INFO  FileSystemViewManager:265 - Creating remote first 
table view
   2023-08-07 21:27:48 INFO  BaseHoodieWriteClient:796 - Scheduling Rollback at 
instant time : 20230807212748520 (exists in active timeline: true), with 
rollback plan: false
   2023-08-07 21:27:48 INFO  KafkaSourceEnumerator:393 - Discovered new 
partitions: [ChangeRecord-0]
   2023-08-07 21:27:48 INFO  KafkaSourceEnumerator:353 - Assigning splits to 
readers {0=[[Partition: ChangeRecord-0, StartingOffset: -2, StoppingOffset: 
-9223372036854775808]]}
   2023-08-07 21:27:48 INFO  SourceReaderBase:235 - Adding split(s) to reader: 
[[Partition: ChangeRecord-0, StartingOffset: -2, StoppingOffset: 
-9223372036854775808]]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807212748520__rollback__REQUESTED]}
   2023-08-07 21:27:48 INFO  BaseRollbackPlanActionExecutor:118 - Requesting 
Rollback with instant time [==>20230807212748520__rollback__REQUESTED]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[==>20230807212748520__rollback__REQUESTED]}
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:632 - Checking for file 
exists 
?file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/20230807212748520.rollback.requested
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:640 - Create new file for 
toInstant 
?file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/20230807212748520.rollback.inflight
   2023-08-07 21:27:48 INFO  CopyOnWriteRollbackActionExecutor:79 - Clean out 
all base files generated for commit: [==>20230807211441479__commit__INFLIGHT]
   2023-08-07 21:27:48 INFO  CopyOnWriteRollbackActionExecutor:85 - Time(in ms) 
taken to finish rollback 4
   2023-08-07 21:27:48 INFO  BaseRollbackActionExecutor:216 - Rolled back 
inflight instant 20230807211441479
   2023-08-07 21:27:48 INFO  BaseRollbackActionExecutor:199 - Index rolled back 
for commits [==>20230807211441479__commit__INFLIGHT]
   2023-08-07 21:27:48 INFO  BaseRollbackActionExecutor:280 - Deleting 
instant=[==>20230807211441479__commit__INFLIGHT]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:293 - Deleting instant 
[==>20230807211441479__commit__INFLIGHT]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:298 - Removed instant 
[==>20230807211441479__commit__INFLIGHT]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:293 - Deleting instant 
[==>20230807211441479__commit__REQUESTED]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:298 - Removed instant 
[==>20230807211441479__commit__REQUESTED]
   2023-08-07 21:27:48 INFO  BaseRollbackActionExecutor:288 - Deleted pending 
commit [==>20230807211441479__commit__REQUESTED]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:632 - Checking for file 
exists 
?file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/20230807212748520.rollback.inflight
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:640 - Create new file for 
toInstant 
?file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/20230807212748520.rollback
   2023-08-07 21:27:48 INFO  BaseRollbackActionExecutor:257 - Rollback of 
Commits [20230807211441479] is complete
   2023-08-07 21:27:48 INFO  BaseHoodieWriteClient:842 - Generate a new instant 
time: 20230807212748607 action: commit
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:171 - Loaded instants upto : 
Option{val=[20230807212748520__rollback__COMPLETED]}
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:202 - Creating a new instant 
[==>20230807212748607__commit__REQUESTED]
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:632 - Checking for file 
exists 
?file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/20230807212748607.commit.requested
   2023-08-07 21:27:48 INFO  HoodieActiveTimeline:640 - Create new file for 
toInstant 
?file:/opt/warehouse/result.db/aggregation_tbl/.hoodie/20230807212748607.inflight
   2023-08-07 21:27:48 INFO  StreamWriteOperatorCoordinator:383 - Create 
instant [20230807212748607] for table [aggregation_tbl] with type 
[COPY_ON_WRITE]
   2023-08-07 21:27:48 INFO  StreamWriteOperatorCoordinator:131 - Executor 
executes action [initialize instant ] success!
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to