divijvaidya commented on code in PR #13572:
URL: https://github.com/apache/kafka/pull/13572#discussion_r1170213078


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1893,6 +1893,33 @@ class SocketServerTest {
     }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+    val acceptor = server.dataPlaneAcceptor(listener)
+    val channel = acceptor.get.serverChannel
+    verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+    shutdownServerAndMetrics(server)
+    val testProps = new Properties
+    testProps ++= props
+    testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")

Review Comment:
   please add a comment here.
   
   We are using 0 so that OS can choose to associated any available port here.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1893,6 +1893,33 @@ class SocketServerTest {
     }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+    val acceptor = server.dataPlaneAcceptor(listener)
+    val channel = acceptor.get.serverChannel
+    verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+    shutdownServerAndMetrics(server)
+    val testProps = new Properties
+    testProps ++= props
+    testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+    testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+    testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+    val config = KafkaConfig.fromProps(testProps)
+    val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+    val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel
+    verifySocketUsesReuseAddress(channel)
+    shutdownServerAndMetrics(testServer)

Review Comment:
   perhaps do this in try/finally so that the resources are cleaned even if the 
test fails with an exception.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1893,6 +1893,33 @@ class SocketServerTest {
     }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+    val acceptor = server.dataPlaneAcceptor(listener)
+    val channel = acceptor.get.serverChannel
+    verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+    shutdownServerAndMetrics(server)
+    val testProps = new Properties
+    testProps ++= props
+    testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+    testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+    testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+    val config = KafkaConfig.fromProps(testProps)
+    val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+    val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel

Review Comment:
   since we are starting up two listeners, could we verify reuse address for 
both please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to