AHeise commented on code in PR #152:
URL: 
https://github.com/apache/flink-connector-kafka/pull/152#discussion_r1954063870


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A backchannel for communication between the Kafka committer -> writer. It's 
used to recycle
+ * producer and signal that certain transactions have been committed on 
recovery.
+ */
+@Internal
+@ThreadSafe
+public class BackchannelImpl<T> implements Backchannel<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BackchannelImpl.class);
+
+    /**
+     * The map of backchannels, keyed by the subtask id and transactional id 
prefix to uniquely
+     * identify the backchannel while establishing the connection.
+     */
+    @GuardedBy("this")
+    private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> 
BACKCHANNELS =
+            new HashMap<>();
+    /** The key to identify the backchannel (subtask id, transactional id 
prefix). */
+    private final Tuple2<Integer, String> key;
+    /**
+     * The writer thread that sends messages to the committer. Used to ensure 
that we switch to a
+     * concurrent queue when the writer and committer are not chained.
+     */
+    private Thread writerThread;
+    /**
+     * The committer thread that consumes messages from the writer. Used to 
ensure that we switch to
+     * a concurrent queue when the writer and committer are not chained.
+     */
+    private Thread committerThread;
+    /**
+     * The messages to be sent from the writer to the committer. By default, 
it's a thread-safe
+     * implementation but it can be replaced with an unsynchronized queue when 
the writer and
+     * committer are chained (common case).
+     */
+    private Deque<T> messages = new ConcurrentLinkedDeque<>();
+    /** Flag to indicate that the backchannel has been established. */
+    private boolean established;
+
+    BackchannelImpl(Tuple2<Integer, String> key) {
+        this.key = key;
+    }
+
+    @Override
+    public void send(T message) {
+        messages.add(message);
+    }
+
+    @Override
+    public void consume(Consumer<T> consumer) {
+        ensureEstablished();
+        T message;
+        while ((message = pollUnsafe()) != null) {
+            consumer.accept(message);
+        }
+    }
+
+    @Nullable
+    private T pollUnsafe() {
+        return messages.poll();
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public T poll() {
+        ensureEstablished();
+        return pollUnsafe();
+    }
+
+    @VisibleForTesting
+    @Override
+    public BackchannelImpl<T> establish() {
+        return create(key.f0, key.f1, writerThread == null);
+    }
+
+    /**
+     * Create a backchannel for the given subtask id and transactional id 
prefix. The backchannel is
+     * created if it does not exist, otherwise, the existing backchannel is 
returned.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix but
+     * different forWriter values, the backchannel will be established and can 
be used for
+     * communication.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix and
+     * the same forWriter value, an error will be logged instead. There is a 
high chance that the
+     * backchannel is misused by using non-uniques transactionalIds.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> BackchannelImpl<T> create(
+            int subtaskId, String transactionalIdPrefix, boolean forWriter) {
+        Tuple2<Integer, String> key = new Tuple2<>(subtaskId, 
transactionalIdPrefix);
+        BackchannelImpl<T> backchannel;
+        boolean duplicate;
+        synchronized (BACKCHANNELS) {
+            backchannel =
+                    (BackchannelImpl<T>)
+                            BACKCHANNELS.computeIfAbsent(key, k -> new 
BackchannelImpl<>(key));
+            duplicate = backchannel.isDuplicate(forWriter);
+            backchannel.register(forWriter);
+        }
+        if (duplicate) {
+            LOG.error(
+                    "Found duplicate transactionalIdPrefix for multiple Kafka 
sinks: {}. Transactional id prefixes need to be unique. You may experience 
memory leaks without fixing this.",
+                    transactionalIdPrefix);
+        }
+        return backchannel;
+    }
+
+    private boolean isDuplicate(boolean forWriter) {
+        return forWriter ? writerThread != null : committerThread != null;
+    }
+
+    private void ensureEstablished() {
+        if (established) {
+            return;
+        }
+
+        // Most of the fields are only properly initialized by the second side 
of register (writer
+        // or committer).
+        // This method briefly enters the critical section to ensure that all 
fields are visible to
+        // the first side.
+        boolean established;
+        synchronized (BACKCHANNELS) {
+            established = this.established;
+        }

Review Comment:
   But if it's a one-time switch, we should we pay the penalty of a volatile 
read everytime?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to