gabrywu opened a new issue, #28985:
URL: https://github.com/apache/beam/issues/28985

   ### What happened?
   
   I'm re-submit a job running on flink runner with savepointPath specificed, 
however it seems that the flink job doesn't use that offsets
   
   allow.auto.create.topics = true
       auto.commit.interval.ms = 5000
       auto.offset.reset = earliest
       bootstrap.servers = [xxxxxxxxxxx]
       check.crcs = true
       client.dns.lookup = use_all_dns_ips
       client.id = consumer-xxxxxxxxxx_consumer-1
       client.rack =
       connections.max.idle.ms = 540000
       default.api.timeout.ms = 60000
       enable.auto.commit = true
       exclude.internal.topics = true
       fetch.max.bytes = 52428800
       fetch.max.wait.ms = 500
       fetch.min.bytes = 1
       group.id = xxxxxxxxxxx_consumer
       group.instance.id = null
       heartbeat.interval.ms = 3000
       interceptor.classes = []
       internal.leave.group.on.close = true
       internal.throw.on.fetch.stable.offset.unsupported = false
       isolation.level = read_uncommitted
       key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
       max.partition.fetch.bytes = 1048576
       max.poll.interval.ms = 300000
       max.poll.records = 500
       metadata.max.age.ms = 300000
       metric.reporters = []
       metrics.num.samples = 2
       metrics.recording.level = INFO
       metrics.sample.window.ms = 30000
       partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
       receive.buffer.bytes = 524288
       reconnect.backoff.max.ms = 1000
       reconnect.backoff.ms = 50
       request.timeout.ms = 30000
       retry.backoff.ms = 100
       sasl.client.callback.handler.class = null
       sasl.jaas.config = [hidden]
       sasl.kerberos.kinit.cmd = /usr/bin/kinit
       sasl.kerberos.min.time.before.relogin = 60000
       sasl.kerberos.service.name = null
       sasl.kerberos.ticket.renew.jitter = 0.05
       sasl.kerberos.ticket.renew.window.factor = 0.8
       sasl.login.callback.handler.class = null
       sasl.login.class = class xxxx
       sasl.login.refresh.buffer.seconds = 300
       sasl.login.refresh.min.period.seconds = 60
       sasl.login.refresh.window.factor = 0.8
       sasl.login.refresh.window.jitter = 0.05
       sasl.mechanism = xxxxxxx
       security.protocol = SASL_PLAINTEXT
       security.providers = null
       send.buffer.bytes = 131072
       session.timeout.ms = 10000
       socket.connection.setup.timeout.max.ms = 30000
       socket.connection.setup.timeout.ms = 10000
       ssl.cipher.suites = null
       ssl.enabled.protocols = [TLSv1.2]
       ssl.endpoint.identification.algorithm =
       ssl.engine.factory.class = null
       ssl.key.password = null
       ssl.keymanager.algorithm = SunX509
       ssl.keystore.certificate.chain = null
       ssl.keystore.key = null
       ssl.keystore.location = null
       ssl.keystore.password = null
       ssl.keystore.type = JKS
       ssl.protocol = TLSv1.2
       ssl.provider = null
       ssl.secure.random.implementation = null
       ssl.trustmanager.algorithm = PKIX
       ssl.truststore.certificates = null
       ssl.truststore.location = null
       ssl.truststore.password = null
       ssl.truststore.type = JKS
       value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
   
   I resubmit the job with a same savepointPath, it use different offsets
   
   2023-10-13 07:02:30,510 INFO  
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-0: 
first record offset 4937805209
   --
     |   | 2023-10-13 07:02:30,913 INFO  
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-2: 
first record offset 4972666796
     |   | 2023-10-13 07:02:28,442 INFO  
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-1: 
first record offset 4961970747
     |   | 2023-10-13 06:53:34,907 INFO  
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-1: 
first record offset 4961177710
     |   | 2023-10-13 06:53:36,643 INFO  
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-2: 
first record offset 4972664643
     |   | 2023-10-13 06:53:34,788 INFO  
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] - Reader-0: 
first record offset 4937802838
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to