mynameborat commented on a change in pull request #1351:
URL: https://github.com/apache/samza/pull/1351#discussion_r415279758



##########
File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
##########
@@ -170,9 +171,18 @@ Throwable getFailureCause() {
     return failureCause;
   }
 
-  private void initializeLags() {
+  @VisibleForTesting
+  void initializeLags() {
     // This is expensive, so only do it once at the beginning. After the first 
poll, we can rely on metrics for lag.
-    Map<TopicPartition, Long> endOffsets = 
kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+
+    Map<TopicPartition, Long> endOffsets;
+    // Synchronize, in case the consumer is used in some other thread 
(metadata or something else)
+    synchronized (kafkaConsumer) {
+      endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+    }
+    if (endOffsets == null) {
+      throw new SamzaException("Failed to fetch kafka consumer endoffsets for 
system " + systemName);

Review comment:
       I think its good idea to fail with meaningful errors instead of a 
potential NPE. Although, is this new contract necessary as part of this change?
   1. Is it possible for `kafkaConsmer.endOffsets(...)` to return null in the 
first place?
   2. If yes to above, have we seen any issues around NPE before or is this 
purely from the standpoint of defensive programming?

##########
File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaConsumerProxy.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+
+import static org.mockito.Mockito.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+
+public class TestKafkaConsumerProxy {
+
+  @Test
+  public void testSynchronizedAccessOfKafkaConsumer() throws Exception {

Review comment:
       I see we don't have tests at all for consumer proxy and thanks for 
adding one :) However, I have few observations/questions
   1. The test still does have some indeterminism and doesn't necessarily tests 
synchronized access as it claims all the time as it is possible that t1 & t2 
run exclusively.
   2. Is the intent here to test java `synchronized` or prevent accidental 
changes to initializeLags making it no longer synchronize on the consumer? 
   - If it is the former, it is going to be hard to test consistently and 
deterministically unless you do something complex gimmicks with latches & 
`Thread.holdsLock(...)`. IMO, it should be okay to assume Java `synchronized` 
works as expected
   - If it the latter, it is still undoubtedly hard but we can enforce them 
through other means with useful explicit comments around the code blocks & 
diligent reviews. 
   
   TLDR; I am fine with not having a test for this scenario instead of having 
one that is indeterministic in what it is tests. If you prefer to have a test, 
then it will be nice to have it do what it claims to do. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to