scwhittle commented on code in PR #32986:
URL: https://github.com/apache/beam/pull/32986#discussion_r1866394445
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -215,20 +233,329 @@ private ReadFromKafkaDoFn(
* must run clean up tasks when {@link #teardown()} is called.
*/
private static final class SharedStateHolder {
-
- private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
KafkaLatestOffsetEstimator>>
- OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
AverageRecordSize>>
AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
+ private static final Map<
+ Long, LoadingCache<Optional<ImmutableSet<String>>,
ConsumerExecutionContext>>
Review Comment:
comment on keys of map and loading cache
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -215,20 +233,329 @@ private ReadFromKafkaDoFn(
* must run clean up tasks when {@link #teardown()} is called.
*/
private static final class SharedStateHolder {
-
- private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
KafkaLatestOffsetEstimator>>
- OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
AverageRecordSize>>
AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
+ private static final Map<
+ Long, LoadingCache<Optional<ImmutableSet<String>>,
ConsumerExecutionContext>>
+ CONSUMER_EXECUTION_CONTEXT_CACHE = new ConcurrentHashMap<>();
+ }
+
+ static final class TopicPartitionPollState implements AutoCloseable {
+ private static final List<ConsumerRecord<byte[], byte[]>> CLOSED_SENTINEL
= Arrays.asList();
+
+ private final AtomicBoolean closed;
+ private final LinkedTransferQueue<List<ConsumerRecord<byte[], byte[]>>>
queue;
+ private final TopicPartition topicPartition;
+ private final OffsetRange offsetRange;
+
+ TopicPartitionPollState(final TopicPartition topicPartition, final
OffsetRange offsetRange) {
+ this.closed = new AtomicBoolean();
+ this.queue = new LinkedTransferQueue<>();
+ this.topicPartition = topicPartition;
+ this.offsetRange = offsetRange;
+ }
+
+ TopicPartition getTopicPartition() {
+ return this.topicPartition;
+ }
+
+ OffsetRange getOffsetRange() {
+ return this.offsetRange;
+ }
+
+ boolean hasWaitingConsumer() {
+ return !this.closed.get() && queue.hasWaitingConsumer();
+ }
+
+ Optional<List<ConsumerRecord<byte[], byte[]>>> take() throws
InterruptedException {
+ if (this.closed.get()) {
+ return Optional.empty();
+ }
+ return Optional.of(queue.take()).filter(records -> records !=
CLOSED_SENTINEL);
Review Comment:
this could block forever if there are multiple threads calling take() as
only one would get the sentinel
if that isn't expected usage some comments on usage woudl be good.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
+ private final AtomicLongMap<TopicPartition> positions;
+ private final Supplier<Metric> recordsLagMax;
+ private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
+ private ConsumerRecords<K, V> pollResult;
+ private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;
+
+ private final class ConsumerPhaser extends Phaser {
+ @Override
+ protected boolean onAdvance(final int phase, final int registeredParties) {
+ try {
+ final Map<TopicPartition, Long> positionsView = positions.asMap();
+ final Set<TopicPartition> prevAssignment = consumer.assignment();
+ final Set<TopicPartition> nextAssignment = positionsView.keySet();
+
+ if (!times.isEmpty()) {
+ offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
+ times.clear();
+ }
+
+ if (!prevAssignment.equals(nextAssignment)) {
+ consumer.assign(nextAssignment);
+ }
+
+ positionsView.forEach(
+ (tp, o) -> {
+ if (o == Long.MIN_VALUE) {
+ consumer.pause(Collections.singleton(tp));
+ } else if (!prevAssignment.contains(tp)) {
+ consumer.seek(tp, o);
+ }
+ });
+
+ if (consumer.paused().size() != nextAssignment.size()) {
+ pollResult = consumer.poll(pollDuration.toMillis());
+ }
+
+ nextAssignment.forEach(tp -> positions.put(tp, consumer.position(tp)));
+ return false;
+ } catch (WakeupException e) {
+ if (!this.isTerminated()) {
+ LOG.error("Unexpected wakeup while running", e);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while reading from Kafka", e);
+ }
+ return true;
+ }
+ }
+
+ ConcurrentConsumer(final Consumer<K, V> consumer, final Duration
pollDuration) {
+ this.phaser = new ConsumerPhaser();
+ this.consumer = consumer;
+ this.pollDuration = pollDuration;
+ this.times = AtomicLongMap.create();
+ this.positions = AtomicLongMap.create();
+ this.recordsLagMax =
+ Suppliers.memoize(
+ () ->
+ this.consumer.metrics().values().stream()
+ .filter(
+ m ->
+
"consumer-fetch-manager-metrics".equals(m.metricName().group())
+ &&
"records-lag-max".equals(m.metricName().name())
+ && !m.metricName().tags().containsKey("topic")
+ &&
!m.metricName().tags().containsKey("partition"))
+ .findAny()
+ .get());
+ this.partitionRecordsLag = new ConcurrentHashMap<>();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ this.phaser.forceTermination();
+ try {
+ this.consumer.wakeup();
+ this.consumer.close();
+ } catch (Exception e) {
+ LOG.error("Exception while closing Kafka consumer", e);
+ }
+ this.times.clear();
+ this.positions.clear();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ boolean isClosed() {
+ return this.phaser.isTerminated();
+ }
+
+ Map<MetricName, ? extends Metric> metrics() {
+ return this.consumer.metrics();
+ }
+
+ long currentLagOrMaxLag(final TopicPartition topicPartition) {
+ final Supplier<Metric> metric =
+ this.partitionRecordsLag.getOrDefault(topicPartition,
this.recordsLagMax);
+ try {
+ return ((Number) metric.get().metricValue()).longValue();
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ long position(final TopicPartition topicPartition) {
+ return this.positions.get(topicPartition) & Long.MAX_VALUE;
+ }
+
+ long initialOffsetForPartition(final TopicPartition topicPartition) {
+ // Offsets start at 0 and there is no position to advance to beyond
Long.MAX_VALUE.
+ // The sign bit indicates that an assignment is paused.
+ checkState(this.phaser.register() >= 0);
+ this.positions.put(topicPartition, Long.MIN_VALUE);
+
+ // Synchronize and throw if the consumer was terminated in between.
+ checkState(this.phaser.arriveAndAwaitAdvance() >= 0);
+
+ // Removal will revoke the assignment when the phase advances.
+ final long result = this.positions.remove(topicPartition);
+
+ // Synchronize and ignore the consumer status since the result is already
known.
+ this.phaser.arriveAndDeregister();
+
+ // Since Long.MIN_VALUE only has the sign bit set, this will return 0 as a
default value if no
+ // position could be determined.
+ return result & Long.MAX_VALUE;
+ }
+
+ @Nullable
+ OffsetAndTimestamp initialOffsetForTime(final TopicPartition topicPartition,
final long time) {
+ // Request the offset closest to the provided time.
+ checkState(this.phaser.register() >= 0);
+ this.times.put(topicPartition, time);
+
+ // Synchronize and throw if the consumer was terminated in between.
+ checkState(this.phaser.arriveAndAwaitAdvance() >= 0);
+ final Map<TopicPartition, OffsetAndTimestamp> result =
this.offsetsForTimesResult;
+
+ // Synchronize and ignore the consumer status since the result is already
known.
+ this.phaser.arriveAndDeregister();
+
+ return result.get(topicPartition);
+ }
+
+ void assignAndSeek(final TopicPartition topicPartition, final long offset) {
+ checkState(this.phaser.register() >= 0);
+
+ this.positions.put(topicPartition, offset);
+ this.partitionRecordsLag.computeIfAbsent(
+ topicPartition,
+ k ->
+ Suppliers.memoize(
Review Comment:
similar concern here, what if metrics aren't updated or it is missing on
original call?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
+ private final AtomicLongMap<TopicPartition> positions;
+ private final Supplier<Metric> recordsLagMax;
+ private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
+ private ConsumerRecords<K, V> pollResult;
+ private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;
+
+ private final class ConsumerPhaser extends Phaser {
+ @Override
+ protected boolean onAdvance(final int phase, final int registeredParties) {
+ try {
+ final Map<TopicPartition, Long> positionsView = positions.asMap();
+ final Set<TopicPartition> prevAssignment = consumer.assignment();
+ final Set<TopicPartition> nextAssignment = positionsView.keySet();
+
+ if (!times.isEmpty()) {
+ offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
+ times.clear();
+ }
+
+ if (!prevAssignment.equals(nextAssignment)) {
+ consumer.assign(nextAssignment);
+ }
+
+ positionsView.forEach(
+ (tp, o) -> {
+ if (o == Long.MIN_VALUE) {
+ consumer.pause(Collections.singleton(tp));
+ } else if (!prevAssignment.contains(tp)) {
+ consumer.seek(tp, o);
+ }
+ });
+
+ if (consumer.paused().size() != nextAssignment.size()) {
+ pollResult = consumer.poll(pollDuration.toMillis());
+ }
+
+ nextAssignment.forEach(tp -> positions.put(tp, consumer.position(tp)));
+ return false;
+ } catch (WakeupException e) {
+ if (!this.isTerminated()) {
+ LOG.error("Unexpected wakeup while running", e);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while reading from Kafka", e);
+ }
+ return true;
+ }
+ }
+
+ ConcurrentConsumer(final Consumer<K, V> consumer, final Duration
pollDuration) {
+ this.phaser = new ConsumerPhaser();
+ this.consumer = consumer;
+ this.pollDuration = pollDuration;
+ this.times = AtomicLongMap.create();
+ this.positions = AtomicLongMap.create();
+ this.recordsLagMax =
+ Suppliers.memoize(
+ () ->
+ this.consumer.metrics().values().stream()
+ .filter(
+ m ->
+
"consumer-fetch-manager-metrics".equals(m.metricName().group())
+ &&
"records-lag-max".equals(m.metricName().name())
+ && !m.metricName().tags().containsKey("topic")
+ &&
!m.metricName().tags().containsKey("partition"))
+ .findAny()
+ .get());
+ this.partitionRecordsLag = new ConcurrentHashMap<>();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ this.phaser.forceTermination();
+ try {
+ this.consumer.wakeup();
+ this.consumer.close();
+ } catch (Exception e) {
+ LOG.error("Exception while closing Kafka consumer", e);
+ }
+ this.times.clear();
+ this.positions.clear();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ boolean isClosed() {
+ return this.phaser.isTerminated();
+ }
+
+ Map<MetricName, ? extends Metric> metrics() {
+ return this.consumer.metrics();
+ }
+
+ long currentLagOrMaxLag(final TopicPartition topicPartition) {
+ final Supplier<Metric> metric =
+ this.partitionRecordsLag.getOrDefault(topicPartition,
this.recordsLagMax);
+ try {
+ return ((Number) metric.get().metricValue()).longValue();
Review Comment:
handle null explicitly without exception?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
Review Comment:
An overview comment would be helpful
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
+ private final AtomicLongMap<TopicPartition> positions;
+ private final Supplier<Metric> recordsLagMax;
+ private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
+ private ConsumerRecords<K, V> pollResult;
+ private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;
+
+ private final class ConsumerPhaser extends Phaser {
+ @Override
+ protected boolean onAdvance(final int phase, final int registeredParties) {
+ try {
+ final Map<TopicPartition, Long> positionsView = positions.asMap();
+ final Set<TopicPartition> prevAssignment = consumer.assignment();
+ final Set<TopicPartition> nextAssignment = positionsView.keySet();
+
+ if (!times.isEmpty()) {
+ offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
+ times.clear();
+ }
+
+ if (!prevAssignment.equals(nextAssignment)) {
+ consumer.assign(nextAssignment);
+ }
+
+ positionsView.forEach(
+ (tp, o) -> {
+ if (o == Long.MIN_VALUE) {
+ consumer.pause(Collections.singleton(tp));
+ } else if (!prevAssignment.contains(tp)) {
+ consumer.seek(tp, o);
+ }
+ });
+
+ if (consumer.paused().size() != nextAssignment.size()) {
+ pollResult = consumer.poll(pollDuration.toMillis());
+ }
+
+ nextAssignment.forEach(tp -> positions.put(tp, consumer.position(tp)));
+ return false;
+ } catch (WakeupException e) {
+ if (!this.isTerminated()) {
+ LOG.error("Unexpected wakeup while running", e);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while reading from Kafka", e);
+ }
+ return true;
+ }
+ }
+
+ ConcurrentConsumer(final Consumer<K, V> consumer, final Duration
pollDuration) {
+ this.phaser = new ConsumerPhaser();
+ this.consumer = consumer;
+ this.pollDuration = pollDuration;
+ this.times = AtomicLongMap.create();
+ this.positions = AtomicLongMap.create();
+ this.recordsLagMax =
+ Suppliers.memoize(
Review Comment:
I don't see guarantees in kafka interface that same metric object is
updated, or about when the metric woudl be created.
Perhaps it woudl be better to have some expiration here? Or at least don't
cache a null if we attempt to memoize before the metric has been created in the
consumer?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
+ private final AtomicLongMap<TopicPartition> positions;
+ private final Supplier<Metric> recordsLagMax;
+ private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
+ private ConsumerRecords<K, V> pollResult;
+ private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;
+
+ private final class ConsumerPhaser extends Phaser {
+ @Override
+ protected boolean onAdvance(final int phase, final int registeredParties) {
+ try {
+ final Map<TopicPartition, Long> positionsView = positions.asMap();
+ final Set<TopicPartition> prevAssignment = consumer.assignment();
+ final Set<TopicPartition> nextAssignment = positionsView.keySet();
+
+ if (!times.isEmpty()) {
+ offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
+ times.clear();
+ }
+
+ if (!prevAssignment.equals(nextAssignment)) {
+ consumer.assign(nextAssignment);
+ }
+
+ positionsView.forEach(
+ (tp, o) -> {
+ if (o == Long.MIN_VALUE) {
+ consumer.pause(Collections.singleton(tp));
+ } else if (!prevAssignment.contains(tp)) {
+ consumer.seek(tp, o);
+ }
+ });
+
+ if (consumer.paused().size() != nextAssignment.size()) {
+ pollResult = consumer.poll(pollDuration.toMillis());
+ }
+
+ nextAssignment.forEach(tp -> positions.put(tp, consumer.position(tp)));
+ return false;
+ } catch (WakeupException e) {
+ if (!this.isTerminated()) {
+ LOG.error("Unexpected wakeup while running", e);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while reading from Kafka", e);
+ }
+ return true;
+ }
+ }
+
+ ConcurrentConsumer(final Consumer<K, V> consumer, final Duration
pollDuration) {
+ this.phaser = new ConsumerPhaser();
+ this.consumer = consumer;
+ this.pollDuration = pollDuration;
+ this.times = AtomicLongMap.create();
+ this.positions = AtomicLongMap.create();
+ this.recordsLagMax =
+ Suppliers.memoize(
+ () ->
+ this.consumer.metrics().values().stream()
+ .filter(
+ m ->
+
"consumer-fetch-manager-metrics".equals(m.metricName().group())
+ &&
"records-lag-max".equals(m.metricName().name())
+ && !m.metricName().tags().containsKey("topic")
+ &&
!m.metricName().tags().containsKey("partition"))
+ .findAny()
+ .get());
+ this.partitionRecordsLag = new ConcurrentHashMap<>();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ this.phaser.forceTermination();
+ try {
+ this.consumer.wakeup();
+ this.consumer.close();
+ } catch (Exception e) {
+ LOG.error("Exception while closing Kafka consumer", e);
+ }
+ this.times.clear();
+ this.positions.clear();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ boolean isClosed() {
+ return this.phaser.isTerminated();
+ }
+
+ Map<MetricName, ? extends Metric> metrics() {
+ return this.consumer.metrics();
+ }
+
+ long currentLagOrMaxLag(final TopicPartition topicPartition) {
+ final Supplier<Metric> metric =
+ this.partitionRecordsLag.getOrDefault(topicPartition,
this.recordsLagMax);
+ try {
+ return ((Number) metric.get().metricValue()).longValue();
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ long position(final TopicPartition topicPartition) {
+ return this.positions.get(topicPartition) & Long.MAX_VALUE;
Review Comment:
comment on the purpose of &, looks like to protect against exposing negative
value?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
Review Comment:
what kind of time? a better variable name could clarify
comment on keys, ditto for positions
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
+ private final AtomicLongMap<TopicPartition> positions;
+ private final Supplier<Metric> recordsLagMax;
+ private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
+ private ConsumerRecords<K, V> pollResult;
+ private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;
+
+ private final class ConsumerPhaser extends Phaser {
+ @Override
+ protected boolean onAdvance(final int phase, final int registeredParties) {
+ try {
+ final Map<TopicPartition, Long> positionsView = positions.asMap();
+ final Set<TopicPartition> prevAssignment = consumer.assignment();
+ final Set<TopicPartition> nextAssignment = positionsView.keySet();
+
+ if (!times.isEmpty()) {
+ offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
+ times.clear();
+ }
+
+ if (!prevAssignment.equals(nextAssignment)) {
+ consumer.assign(nextAssignment);
+ }
+
+ positionsView.forEach(
+ (tp, o) -> {
+ if (o == Long.MIN_VALUE) {
+ consumer.pause(Collections.singleton(tp));
Review Comment:
would it be better to just pause once by building up all partitions you want
to pause?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -215,20 +233,329 @@ private ReadFromKafkaDoFn(
* must run clean up tasks when {@link #teardown()} is called.
*/
private static final class SharedStateHolder {
-
- private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
KafkaLatestOffsetEstimator>>
- OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
AverageRecordSize>>
AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
+ private static final Map<
+ Long, LoadingCache<Optional<ImmutableSet<String>>,
ConsumerExecutionContext>>
+ CONSUMER_EXECUTION_CONTEXT_CACHE = new ConcurrentHashMap<>();
+ }
+
+ static final class TopicPartitionPollState implements AutoCloseable {
+ private static final List<ConsumerRecord<byte[], byte[]>> CLOSED_SENTINEL
= Arrays.asList();
+
+ private final AtomicBoolean closed;
+ private final LinkedTransferQueue<List<ConsumerRecord<byte[], byte[]>>>
queue;
+ private final TopicPartition topicPartition;
+ private final OffsetRange offsetRange;
+
+ TopicPartitionPollState(final TopicPartition topicPartition, final
OffsetRange offsetRange) {
+ this.closed = new AtomicBoolean();
+ this.queue = new LinkedTransferQueue<>();
+ this.topicPartition = topicPartition;
+ this.offsetRange = offsetRange;
+ }
+
+ TopicPartition getTopicPartition() {
+ return this.topicPartition;
+ }
+
+ OffsetRange getOffsetRange() {
+ return this.offsetRange;
+ }
+
+ boolean hasWaitingConsumer() {
+ return !this.closed.get() && queue.hasWaitingConsumer();
+ }
+
+ Optional<List<ConsumerRecord<byte[], byte[]>>> take() throws
InterruptedException {
Review Comment:
this looks similar to
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
should we unify?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -215,20 +233,329 @@ private ReadFromKafkaDoFn(
* must run clean up tasks when {@link #teardown()} is called.
*/
private static final class SharedStateHolder {
-
- private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
KafkaLatestOffsetEstimator>>
- OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>();
private static final Map<Long, LoadingCache<KafkaSourceDescriptor,
AverageRecordSize>>
AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>();
+ private static final Map<
+ Long, LoadingCache<Optional<ImmutableSet<String>>,
ConsumerExecutionContext>>
+ CONSUMER_EXECUTION_CONTEXT_CACHE = new ConcurrentHashMap<>();
+ }
+
+ static final class TopicPartitionPollState implements AutoCloseable {
+ private static final List<ConsumerRecord<byte[], byte[]>> CLOSED_SENTINEL
= Arrays.asList();
Review Comment:
nit: maybe ImmutableList.of() would be clearer this isn't to be modified
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -267,80 +284,121 @@ private static final class SharedStateHolder {
private static class KafkaLatestOffsetEstimator
implements GrowableOffsetRangeTracker.RangeEndEstimator {
- private final Consumer<byte[], byte[]> offsetConsumer;
- private final TopicPartition topicPartition;
- private final Supplier<Long> memoizedBacklog;
+ private final KafkaSourceDescriptor sourceDescriptor;
+ private final LoadingCache<Optional<ImmutableSet<String>>,
ConcurrentConsumer<byte[], byte[]>>
+ consumerExecutionContextCache;
+ private @MonotonicNonNull ConcurrentConsumer<byte[], byte[]>
consumerExecutionContextInstance;
KafkaLatestOffsetEstimator(
- Consumer<byte[], byte[]> offsetConsumer, TopicPartition
topicPartition) {
- this.offsetConsumer = offsetConsumer;
- this.topicPartition = topicPartition;
- memoizedBacklog =
- Suppliers.memoizeWithExpiration(
- () -> {
- synchronized (offsetConsumer) {
- return Preconditions.checkStateNotNull(
- offsetConsumer
- .endOffsets(Collections.singleton(topicPartition))
- .get(topicPartition),
- "No end offset found for partition %s.",
- topicPartition);
- }
- },
- 1,
- TimeUnit.SECONDS);
+ final KafkaSourceDescriptor sourceDescriptor,
+ final LoadingCache<Optional<ImmutableSet<String>>,
ConcurrentConsumer<byte[], byte[]>>
+ consumerExecutionContextCache) {
+ this.sourceDescriptor = sourceDescriptor;
+ this.consumerExecutionContextCache = consumerExecutionContextCache;
+ this.consumerExecutionContextInstance = null;
}
@Override
- protected void finalize() {
+ public long estimate() {
+ Optional<ImmutableSet<String>> consumerExecutionContextKey =
+ Optional.ofNullable(this.sourceDescriptor.getBootStrapServers())
+ .map(ImmutableSet::copyOf);
+ ConcurrentConsumer<byte[], byte[]> consumerExecutionContext;
try {
- Closeables.close(offsetConsumer, true);
- LOG.info("Offset Estimator consumer was closed for {}",
topicPartition);
- } catch (Exception anyException) {
- LOG.warn("Failed to close offset consumer for {}", topicPartition);
+ consumerExecutionContext =
+
this.consumerExecutionContextCache.get(consumerExecutionContextKey);
+ } catch (ExecutionException ex) {
+ return -1L;
}
- }
+ this.consumerExecutionContextInstance = consumerExecutionContext;
- @Override
- public long estimate() {
- return memoizedBacklog.get();
+ final long position =
+
this.consumerExecutionContextInstance.position(this.sourceDescriptor.getTopicPartition());
Review Comment:
use consumerExecutionContext instead of this.consumerExecutionContext
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -267,80 +284,121 @@ private static final class SharedStateHolder {
private static class KafkaLatestOffsetEstimator
implements GrowableOffsetRangeTracker.RangeEndEstimator {
- private final Consumer<byte[], byte[]> offsetConsumer;
- private final TopicPartition topicPartition;
- private final Supplier<Long> memoizedBacklog;
+ private final KafkaSourceDescriptor sourceDescriptor;
+ private final LoadingCache<Optional<ImmutableSet<String>>,
ConcurrentConsumer<byte[], byte[]>>
+ consumerExecutionContextCache;
+ private @MonotonicNonNull ConcurrentConsumer<byte[], byte[]>
consumerExecutionContextInstance;
Review Comment:
This doesn't appear to be used to avoid lookups in the cache. Should it be
removed or should the logic be changed?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConcurrentConsumer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.function.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.AtomicLongMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrentConsumer<K, V> implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConsumer.class);
+
+ private final ConsumerPhaser phaser;
+ private final Consumer<K, V> consumer;
+ private final Duration pollDuration;
+ private final AtomicLongMap<TopicPartition> times;
+ private final AtomicLongMap<TopicPartition> positions;
+ private final Supplier<Metric> recordsLagMax;
+ private final Map<TopicPartition, Supplier<Metric>> partitionRecordsLag;
+ private ConsumerRecords<K, V> pollResult;
+ private Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResult;
+
+ private final class ConsumerPhaser extends Phaser {
+ @Override
+ protected boolean onAdvance(final int phase, final int registeredParties) {
+ try {
+ final Map<TopicPartition, Long> positionsView = positions.asMap();
+ final Set<TopicPartition> prevAssignment = consumer.assignment();
+ final Set<TopicPartition> nextAssignment = positionsView.keySet();
+
+ if (!times.isEmpty()) {
+ offsetsForTimesResult = consumer.offsetsForTimes(times.asMap());
+ times.clear();
+ }
+
+ if (!prevAssignment.equals(nextAssignment)) {
+ consumer.assign(nextAssignment);
+ }
+
+ positionsView.forEach(
+ (tp, o) -> {
+ if (o == Long.MIN_VALUE) {
+ consumer.pause(Collections.singleton(tp));
+ } else if (!prevAssignment.contains(tp)) {
+ consumer.seek(tp, o);
+ }
+ });
+
+ if (consumer.paused().size() != nextAssignment.size()) {
+ pollResult = consumer.poll(pollDuration.toMillis());
+ }
+
+ nextAssignment.forEach(tp -> positions.put(tp, consumer.position(tp)));
+ return false;
+ } catch (WakeupException e) {
+ if (!this.isTerminated()) {
+ LOG.error("Unexpected wakeup while running", e);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while reading from Kafka", e);
+ }
+ return true;
+ }
+ }
+
+ ConcurrentConsumer(final Consumer<K, V> consumer, final Duration
pollDuration) {
+ this.phaser = new ConsumerPhaser();
+ this.consumer = consumer;
+ this.pollDuration = pollDuration;
+ this.times = AtomicLongMap.create();
+ this.positions = AtomicLongMap.create();
+ this.recordsLagMax =
+ Suppliers.memoize(
+ () ->
+ this.consumer.metrics().values().stream()
+ .filter(
+ m ->
+
"consumer-fetch-manager-metrics".equals(m.metricName().group())
+ &&
"records-lag-max".equals(m.metricName().name())
+ && !m.metricName().tags().containsKey("topic")
+ &&
!m.metricName().tags().containsKey("partition"))
+ .findAny()
+ .get());
+ this.partitionRecordsLag = new ConcurrentHashMap<>();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ this.phaser.forceTermination();
+ try {
+ this.consumer.wakeup();
+ this.consumer.close();
+ } catch (Exception e) {
+ LOG.error("Exception while closing Kafka consumer", e);
+ }
+ this.times.clear();
+ this.positions.clear();
+ this.pollResult = ConsumerRecords.empty();
+ this.offsetsForTimesResult = Collections.emptyMap();
+ }
+
+ boolean isClosed() {
+ return this.phaser.isTerminated();
+ }
+
+ Map<MetricName, ? extends Metric> metrics() {
+ return this.consumer.metrics();
+ }
+
+ long currentLagOrMaxLag(final TopicPartition topicPartition) {
+ final Supplier<Metric> metric =
+ this.partitionRecordsLag.getOrDefault(topicPartition,
this.recordsLagMax);
+ try {
+ return ((Number) metric.get().metricValue()).longValue();
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ long position(final TopicPartition topicPartition) {
+ return this.positions.get(topicPartition) & Long.MAX_VALUE;
+ }
+
+ long initialOffsetForPartition(final TopicPartition topicPartition) {
+ // Offsets start at 0 and there is no position to advance to beyond
Long.MAX_VALUE.
+ // The sign bit indicates that an assignment is paused.
+ checkState(this.phaser.register() >= 0);
+ this.positions.put(topicPartition, Long.MIN_VALUE);
+
+ // Synchronize and throw if the consumer was terminated in between.
+ checkState(this.phaser.arriveAndAwaitAdvance() >= 0);
+
+ // Removal will revoke the assignment when the phase advances.
+ final long result = this.positions.remove(topicPartition);
+
+ // Synchronize and ignore the consumer status since the result is already
known.
+ this.phaser.arriveAndDeregister();
+
+ // Since Long.MIN_VALUE only has the sign bit set, this will return 0 as a
default value if no
+ // position could be determined.
+ return result & Long.MAX_VALUE;
+ }
+
+ @Nullable
+ OffsetAndTimestamp initialOffsetForTime(final TopicPartition topicPartition,
final long time) {
Review Comment:
what is units for time? can Instant be used here and changed to long
internally?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]