Fengt-en opened a new issue, #9340:
URL: https://github.com/apache/seatunnel/issues/9340

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   当使用earliest模式启动时,并没有消费到从头开始的历史数据,同样程序也没有中断,在程序运行的情况下新增数据,此时数据可以消费到。 
我认为此时earliest模式的消费策略与latest一样。通过源码   LsnOffsetFactory中
   @Override
       public Offset earliest() {
           return LsnOffset.INITIAL_OFFSET;
       }
   earliest模式调用的INITIAL_OFFSET,而
   LsnOffset中的 public static final LsnOffset INITIAL_OFFSET =new 
LsnOffset(Lsn.INVALID_LSN.asLong(), null, Instant.MIN);发现,此时lsn赋值为0。
   但在pgsql中slot的restart_lsn 显然已经不为0,而是最近更新的值。
   故earliest模式目前不可用,希望回复,后续会不会完善该功能。也请指导一下是否我的思路有错。
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   env{
     job.mode="STREAMING"
     parallelism = 1
     execution.checkpoint.interval="15000"
     execution.checkpoint.timeout="15000"
     checkpoint.interval="15000"
     checkpoint.timeout="15000"
     read_limit.bytes_per_second= 2147483647
     read_limit.rows_per_second= 10000
     job.name= "FT_P_G_0407"
     custom_parameters=[  ]
   }
   source{
    Postgres-CDC {
        base-url ="jdbc:postgresql://localhost:5432/test"
        username = "postgres"
        password = "123456"
        slot.name = "ft_slot2"
        database-names = ["test"]
        schema-names = ["ft_schema"]
        table-names = ["test.ft_schema.ft_test_01"]
        table-names-config = [{"table": "test.ft_schema.ft_test_01", 
"primaryKeys": ["id"]}]
        plugin_output ="ItegrationInput_2113529222"
        startup.mode ="earliest"
       #startup.mode = "specific"
       #startup.specific-offset.pos="2137233144"
       
#startup.specific-offset={lsn=2137233144,ts_usec=-9223372036854775808,txId=59617}
       
#startup.specific-offset={lsn=2137233144,ts_usec=1747694892000,txId=59617}
   }
   }
   transform{
   
   }
   sink{
   
   Console{}
   }
   ```
   
   ### Running Command
   
   ```shell
   /
   ```
   
   ### Error Exception
   
   ```log
   /
   ```
   
   ### Zeta or Flink or Spark Version
   
   flink 1.15
   
   ### Java or Scala Version
   
   1.8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to