gabrywu opened a new issue, #28985:
URL: https://github.com/apache/beam/issues/28985
### What happened?
I'm re-submit a job running on flink runner with savepointPath specificed,
however it seems that the flink job doesn't use that offsets
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [xxxxxxxxxxx]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-xxxxxxxxxx_consumer-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = xxxxxxxxxxx_consumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = class xxxx
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = xxxxxxx
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
I resubmit the job with a same savepointPath, it use different offsets
2023-10-13 07:02:30,510 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0:
first record offset 4937805209
--
| | 2023-10-13 07:02:30,913 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-2:
first record offset 4972666796
| | 2023-10-13 07:02:28,442 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-1:
first record offset 4961970747
| | 2023-10-13 06:53:34,907 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-1:
first record offset 4961177710
| | 2023-10-13 06:53:36,643 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-2:
first record offset 4972664643
| | 2023-10-13 06:53:34,788 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0:
first record offset 4937802838
### Issue Priority
Priority: 1 (data loss / total loss of function)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [X] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]