Repository: kafka
Updated Branches:
  refs/heads/trunk 75b0f30c4 -> 017a21c60


MINOR: Remove unneeded error handlers in deprecated request objects

These handlers were previously used on the broker to handle uncaught 
exceptions, but now the broker users the new Java request objects exclusively.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3646 from hachikuji/remove-old-request-error-handlers


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/017a21c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/017a21c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/017a21c6

Branch: refs/heads/trunk
Commit: 017a21c6044189abe40b2b8b09ba51447646136e
Parents: 75b0f30
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Aug 10 11:07:53 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Aug 10 11:07:53 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/FetchRequest.scala | 19 +---------------
 .../kafka/api/GroupCoordinatorRequest.scala     | 10 +--------
 .../scala/kafka/api/OffsetCommitRequest.scala   | 12 +---------
 .../scala/kafka/api/OffsetFetchRequest.scala    | 23 ++------------------
 .../main/scala/kafka/api/OffsetRequest.scala    | 13 +----------
 .../main/scala/kafka/api/ProducerRequest.scala  | 17 +--------------
 .../scala/kafka/api/TopicMetadataRequest.scala  | 12 +---------
 docs/upgrade.html                               |  5 +++++
 8 files changed, 13 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala 
b/core/src/main/scala/kafka/api/FetchRequest.scala
index 1f23e40..0379559 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -21,15 +21,10 @@ import kafka.utils.nonthreadsafe
 import kafka.api.ApiUtils._
 import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
-import kafka.network.RequestChannel
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
-import java.util
 
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.{FetchResponse => JFetchResponse}
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
@@ -200,18 +195,6 @@ case class FetchRequest(versionId: Short = 
FetchRequest.CurrentVersion,
     describe(true)
   }
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val responseData = new util.LinkedHashMap[TopicPartition, 
JFetchResponse.PartitionData]
-    requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
-      responseData.put(new TopicPartition(topic, partition),
-        new JFetchResponse.PartitionData(Errors.forException(e), 
JFetchResponse.INVALID_HIGHWATERMARK,
-          JFetchResponse.INVALID_LAST_STABLE_OFFSET, 
JFetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
-    }
-    val errorResponse = new JFetchResponse(responseData, 0)
-    // Magic value does not matter here because the message set is empty
-    requestChannel.sendResponse(RequestChannel.Response(request, 
errorResponse))
-  }
-
   override def describe(details: Boolean): String = {
     val fetchRequest = new StringBuilder
     fetchRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala 
b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 2f197bc..6ee6ae7 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -19,9 +19,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
 
 @deprecated("This object has been deprecated and will be removed in a future 
release.", "1.0.0")
 object GroupCoordinatorRequest {
@@ -64,12 +62,6 @@ case class GroupCoordinatorRequest(group: String,
     ApiUtils.writeShortString(buffer, group)
   }
 
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupCoordinatorResponse(None, 
Errors.COORDINATOR_NOT_AVAILABLE, correlationId)
-    requestChannel.sendResponse(Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
   def describe(details: Boolean) = {
     val consumerMetadataRequest = new StringBuilder
     consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 5aaf0ec..bffcec3 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -21,10 +21,8 @@ import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection._
 
@@ -162,14 +160,6 @@ case class OffsetCommitRequest(groupId: String,
       })
     })
 
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val error = Errors.forException(e)
-    val commitStatus = requestInfo.mapValues(_ => error)
-    val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
-
-    requestChannel.sendResponse(Response(request, new 
RequestOrResponseSend(request.connectionId, commitResponse)))
-  }
-
   override def describe(details: Boolean): String = {
     val offsetCommitRequest = new StringBuilder
     offsetCommitRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 23bbad1..c24078d 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -20,11 +20,9 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.common.{TopicAndPartition, _}
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
+import kafka.common.TopicAndPartition
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
 
 @deprecated("This object has been deprecated and will be removed in a future 
release.", "1.0.0")
 object OffsetFetchRequest extends Logging {
@@ -92,23 +90,6 @@ case class OffsetFetchRequest(groupId: String,
       t._2.size * 4 /* partition */
     })
 
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val requestVersion = request.header.apiVersion
-
-    val thrownError = Errors.forException(e)
-    val responseMap =
-      if (requestVersion < 2) {
-        requestInfo.map {
-          topicAndPartition => (topicAndPartition, 
OffsetMetadataAndError(thrownError))
-        }.toMap
-      } else {
-        Map[kafka.common.TopicAndPartition, 
kafka.common.OffsetMetadataAndError]()
-      }
-
-    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, 
correlationId=correlationId, error=thrownError)
-    requestChannel.sendResponse(Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
   override def describe(details: Boolean): String = {
     val offsetFetchRequest = new StringBuilder
     offsetFetchRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala 
b/core/src/main/scala/kafka/api/OffsetRequest.scala
index bc5b66d..f5483b1 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -21,10 +21,7 @@ import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
 import kafka.common.TopicAndPartition
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-
+import org.apache.kafka.common.protocol.ApiKeys
 
 @deprecated("This object has been deprecated and will be removed in a future 
release.", "1.0.0")
 object OffsetRequest {
@@ -115,14 +112,6 @@ case class OffsetRequest(requestInfo: 
Map[TopicAndPartition, PartitionOffsetRequ
     describe(true)
   }
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val partitionOffsetResponseMap = requestInfo.map { case 
(topicAndPartition, _) =>
-        (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), 
Nil))
-    }
-    val errorResponse = OffsetResponse(correlationId, 
partitionOffsetResponseMap)
-    requestChannel.sendResponse(Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
   override def describe(details: Boolean): String = {
     val offsetRequest = new StringBuilder
     offsetRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala 
b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 13ed16c..921e011 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -22,9 +22,7 @@ import java.nio._
 import kafka.api.ApiUtils._
 import kafka.common._
 import kafka.message._
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
 
 @deprecated("This object has been deprecated and will be removed in a future 
release.", "1.0.0")
 object ProducerRequest {
@@ -130,19 +128,6 @@ case class ProducerRequest(versionId: Short = 
ProducerRequest.CurrentVersion,
     describe(true)
   }
 
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    if (request.body[org.apache.kafka.common.requests.ProduceRequest].acks == 
0) {
-        requestChannel.sendResponse(new RequestChannel.Response(request, None, 
RequestChannel.CloseConnectionAction))
-    }
-    else {
-      val producerResponseStatus = data.map { case (topicAndPartition, _) =>
-        (topicAndPartition, ProducerResponseStatus(Errors.forException(e), 
-1l, Message.NoTimestamp))
-      }
-      val errorResponse = ProducerResponse(correlationId, 
producerResponseStatus)
-      requestChannel.sendResponse(Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
-    }
-  }
-
   override def describe(details: Boolean): String = {
     val producerRequest = new StringBuilder
     producerRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 80107aa..217cedc 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -20,10 +20,8 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.ApiKeys
 
 @deprecated("This object has been deprecated and will be removed in a future 
release.", "1.0.0")
 object TopicMetadataRequest extends Logging {
@@ -61,14 +59,6 @@ case class TopicMetadataRequest(versionId: Short,
     describe(true)
   }
 
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val topicMetadata = topics.map {
-      topic => TopicMetadata(topic, Nil, Errors.forException(e))
-    }
-    val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, 
correlationId)
-    requestChannel.sendResponse(Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
   override def describe(details: Boolean): String = {
     val topicMetadataRequest = new StringBuilder
     topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/017a21c6/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d5279f2..bb26c52 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -60,6 +60,11 @@
         if the version of client's FetchRequest or ProducerRequest does not 
support KafkaStorageException. </li>
     <li>-XX:+DisableExplicitGC was replaced by 
-XX:+ExplicitGCInvokesConcurrent in the default JVM settings. This helps
         avoid out of memory exceptions during allocation of native memory by 
direct buffers in some cases.</li>
+    <li>The overridden <code>handleError</code> method implementations have 
been removed from the following deprecated classes in
+        the <code>kafka.api</code> package: <code>FetchRequest</code>, 
<code>GroupCoordinatorRequest</code>, <code>OffsetCommitRequest</code>,
+        <code>OffsetFetchRequest</code>, <code>OffsetRequest</code>, 
<code>ProducerRequest</code>, and <code>TopicMetadataRequest</code>.
+        This was only intended for use on the broker, but it is no longer in 
use and the implementations have not been maintained.
+        A stub implementation has been retained for binary compatibility.</li>
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New 
Protocol Versions</a></h5>

Reply via email to