Github user haohui commented on a diff in the pull request:
https://github.com/apache/flink/pull/3314#discussion_r102299038
--- Diff:
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {
keyPayload.get(keyBytes);
}
- final T value =
deserializer.deserialize(keyBytes, valueBytes,
-
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-
- if
(deserializer.isEndOfStream(value)) {
- // remove
partition from subscribed partitions.
-
partitionsIterator.remove();
- continue
partitionsLoop;
- }
-
- owner.emitRecord(value,
currentPartition, offset);
+ final Collector<T>
collector = new Collector<T>() {
--- End diff --
Totally agree. Playing around a little bit and it might require some
trade-offs here.
The problem is that `emitRecord()` needs the state for each records (e.g.,
topic partition, offset, etc.). The state can be either passed inside a closure
(like the new instance for the `Collector`) or passed through arguments. I see
there are three possibilities here:
1. Create a new instance of `Collector` for every record. The JVM may or
may not be able to optimize it. Trace-based JVM should be able to but I'm not
sure about classed-based JVM.
2. Expose the internal state in the `collect()` call. The `collect()` call
takes additional parameters such as offset and partition state. It reduces the
GC overheads but also hinders changing the implementation.
3. Create a new interface like `Optional<T> deserialize(byte[] messageKey,
...)` (or
`void deserialize(byte[] messageKey, ..., AtomicReference<T> result)` to
optimize away the cost of the `Optional` class). It results in a slightly more
complex APIs but it probably has the best trade-offs between performances and
API compatibility.
What do you think?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---