kirktrue commented on code in PR #12672:
URL: https://github.com/apache/kafka/pull/12672#discussion_r976889155


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code 
ApplicationEvent} from the {@link 
org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event 
loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements 
AutoCloseable {
+    private static final String CONSUMER_BACKGROUND_THREAD_PREFIX = 
"consumer_background_thread";
+
+    private final BlockingQueue<ConsumerRequestEvent> consumerRequestEvent;
+    private final BlockingQueue<ConsumerResponseEvent> consumerResponseEvent;
+    private Time time;

Review Comment:
   Nit-picky, but this can and probably should be `final` too.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code 
ApplicationEvent} from the {@link 
org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event 
loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements 
AutoCloseable {

Review Comment:
   IIRC, `AutoCloseable` is usually for cases of "try-with-resources." Is that 
how it will be used?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code 
ApplicationEvent} from the {@link 
org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event 
loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements 
AutoCloseable {
+    private static final String CONSUMER_BACKGROUND_THREAD_PREFIX = 
"consumer_background_thread";
+
+    private final BlockingQueue<ConsumerRequestEvent> consumerRequestEvent;
+    private final BlockingQueue<ConsumerResponseEvent> consumerResponseEvent;
+    private Time time;
+
+    // control variables
+    private volatile boolean closed = false;
+
+    public ConsumerBackgroundThread(BlockingQueue<ConsumerRequestEvent> 
consumerRequestEvents,

Review Comment:
   The constuctor logic seems a bit reversed from what I'd expect. 
   
   I would imagine the three-arg version would be be:
   
   ```
   super(CONSUMER_BACKGROUND_THREAD_PREFIX, true);
   this.time = time;
   this.consumerRequestEvent = consumerRequestEvents;
   this.consumerResponseEvent = consumerResponseEvents;
   ```
   
   And then the two-arg version simply be:
   
   ```
   this(Time.SYSTEM, consumerRequestEvents, consumerResponseEvents);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class interfaces the KafkaConsumer and the background thread.  It 
allows the caller to enqueue {@link ApplicationEvent}
+ * to be consumed by the background thread and poll {@linkBackgroundEvent} 
produced by the background thread.
+ */
+public class DefaultEventHandler implements EventHandler {
+    private BlockingQueue<ApplicationEvent> applicationEvents;
+    private BlockingQueue<BackgroundEvent> backgroundEvents;
+
+    public DefaultEventHandler() {
+        this.applicationEvents = new LinkedBlockingQueue<>();
+        this.backgroundEvents = new LinkedBlockingQueue<>();
+        // TODO: a concreted implementation of how requests are being 
consumed, and how responses are being produced.
+    }
+
+    @Override
+    public BackgroundEvent poll() {
+        return backgroundEvents.poll();

Review Comment:
   This can return `null`, right? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class interfaces the KafkaConsumer and the background thread.  It 
allows the caller to enqueue {@link ApplicationEvent}
+ * to be consumed by the background thread and poll {@linkBackgroundEvent} 
produced by the background thread.
+ */
+public class DefaultEventHandler implements EventHandler {
+    private BlockingQueue<ApplicationEvent> applicationEvents;
+    private BlockingQueue<BackgroundEvent> backgroundEvents;
+
+    public DefaultEventHandler() {
+        this.applicationEvents = new LinkedBlockingQueue<>();
+        this.backgroundEvents = new LinkedBlockingQueue<>();
+        // TODO: a concreted implementation of how requests are being 
consumed, and how responses are being produced.
+    }
+
+    @Override
+    public BackgroundEvent poll() {
+        return backgroundEvents.poll();
+    }
+
+    @Override
+    public boolean add(ApplicationEvent event) {
+        return applicationEvents.add(event);

Review Comment:
   According to the [JavaDoc for 
`AbstractQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/AbstractQueue.html#add-E-):
   
   > This implementation returns `true` if `offer` (sic) succeeds, else throws 
an `IllegalStateException`.
   
   Do we want an exception thrown if the buffer is full? Or do we want the 
[`offer`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#offer-E-)
 method which returns `false`?
   
   > Inserts the specified element at the tail of this queue if it is possible 
to do so immediately without exceeding the queue's capacity, returning `true` 
upon success and `false` if this queue is full. When using a 
capacity-restricted queue, this method is generally preferable to method 
[`add`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#add-E-),
 which can fail to insert an element only by throwing an exception.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code 
ApplicationEvent} from the {@link 
org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event 
loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements 
AutoCloseable {
+    private static final String CONSUMER_BACKGROUND_THREAD_PREFIX = 
"consumer_background_thread";
+
+    private final BlockingQueue<ConsumerRequestEvent> consumerRequestEvent;
+    private final BlockingQueue<ConsumerResponseEvent> consumerResponseEvent;
+    private Time time;
+
+    // control variables
+    private volatile boolean closed = false;
+
+    public ConsumerBackgroundThread(BlockingQueue<ConsumerRequestEvent> 
consumerRequestEvents,
+                                    BlockingQueue<ConsumerResponseEvent> 
consumerResponseEvents) {
+        super(CONSUMER_BACKGROUND_THREAD_PREFIX, true);
+        this.time = Time.SYSTEM;
+        this.consumerRequestEvent = consumerRequestEvents;
+        this.consumerResponseEvent = consumerResponseEvents;
+    }
+
+    public ConsumerBackgroundThread(Time time,
+                                    BlockingQueue<ConsumerRequestEvent> 
consumerRequestEvents,
+                                    BlockingQueue<ConsumerResponseEvent> 
consumerResponseEvents) {
+        this(consumerRequestEvents, consumerResponseEvents);
+        this.time = time;
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(!closed) {
+                // TODO: implementation will be added here

Review Comment:
   Magic happens here... 😄 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class interfaces the KafkaConsumer and the background thread.  It 
allows the caller to enqueue {@link ApplicationEvent}
+ * to be consumed by the background thread and poll {@linkBackgroundEvent} 
produced by the background thread.
+ */
+public class DefaultEventHandler implements EventHandler {
+    private BlockingQueue<ApplicationEvent> applicationEvents;

Review Comment:
   These two `BlockingQueue`s can be `final`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * This class interfaces with the KafkaConsumer and the background thread. It 
allows the caller to enqueue events via
+ * the {@code add()} method and to retrieve events via the {@code poll()} 
method.
+ */
+public interface EventHandler {
+    public BackgroundEvent poll();
+    public boolean add(ApplicationEvent event);

Review Comment:
   Can you describe that the `boolean` that is returned from `add` represents?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to