This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 300e825be74 MINOR Refactored share module classes (#17178)
300e825be74 is described below

commit 300e825be748e6aa55f86f6a6fa588398f1fcc73
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Sep 12 18:32:15 2024 +0100

    MINOR Refactored share module classes (#17178)
    
    Reviewers: Andrew Schofield <[email protected]>, David Arthur 
<[email protected]>
---
 checkstyle/import-control-share.xml                          |  6 ++++++
 core/src/main/java/kafka/server/share/SharePartition.java    |  7 ++++++-
 .../main/java/kafka/server/share/SharePartitionManager.java  | 12 ++++++------
 core/src/main/scala/kafka/server/BrokerServer.scala          |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala             |  3 ++-
 .../java/kafka/server/share/SharePartitionManagerTest.java   | 12 ++++++------
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala    |  4 +++-
 .../kafka/server/share/{ => context}/FinalContext.java       |  3 ++-
 .../kafka/server/share/{ => context}/ShareFetchContext.java  |  4 +++-
 .../server/share/{ => context}/ShareSessionContext.java      |  5 ++++-
 .../apache/kafka/server/share/{ => session}/LastUsedKey.java |  2 +-
 .../kafka/server/share/{ => session}/ShareSession.java       |  3 ++-
 .../kafka/server/share/{ => session}/ShareSessionCache.java  |  3 ++-
 .../kafka/server/share/{ => session}/ShareSessionKey.java    |  2 +-
 .../server/share/{ => session}/ShareSessionCacheTest.java    |  3 ++-
 .../kafka/server/share/{ => session}/ShareSessionTest.java   |  2 +-
 16 files changed, 48 insertions(+), 25 deletions(-)

diff --git a/checkstyle/import-control-share.xml 
b/checkstyle/import-control-share.xml
index 1356966e4c6..531952b9118 100644
--- a/checkstyle/import-control-share.xml
+++ b/checkstyle/import-control-share.xml
@@ -39,4 +39,10 @@
   <allow pkg="org.apache.kafka.common.protocol" />
   <allow pkg="org.apache.kafka.common.requests" />
 
+  <subpackage name="server">
+    <subpackage name="share">
+      <allow pkg="org.apache.kafka.server.share" />
+    </subpackage>
+  </subpackage>
+
 </import-control>
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 7fc3f8d30b2..274b934a088 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1018,7 +1018,12 @@ public class SharePartition {
     }
 
     private void completeInitializationWithException(CompletableFuture<Void> 
future, Throwable exception) {
-        partitionState = SharePartitionState.FAILED;
+        lock.writeLock().lock();
+        try {
+            partitionState = SharePartitionState.FAILED;
+        } finally {
+            lock.writeLock().unlock();
+        }
         future.completeExceptionally(exception);
     }
 
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index d6d43170215..2665d7dd8b4 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -40,13 +40,13 @@ import 
org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.group.share.Persister;
 import org.apache.kafka.server.share.CachedSharePartition;
-import org.apache.kafka.server.share.FinalContext;
 import org.apache.kafka.server.share.ShareAcknowledgementBatch;
-import org.apache.kafka.server.share.ShareFetchContext;
-import org.apache.kafka.server.share.ShareSession;
-import org.apache.kafka.server.share.ShareSessionCache;
-import org.apache.kafka.server.share.ShareSessionContext;
-import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.server.share.context.FinalContext;
+import org.apache.kafka.server.share.context.ShareFetchContext;
+import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.session.ShareSession;
+import org.apache.kafka.server.share.session.ShareSessionCache;
+import org.apache.kafka.server.share.session.ShareSessionKey;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.SystemTimer;
 import org.apache.kafka.server.util.timer.SystemTimerReaper;
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7b913b7f48..34ec3398b8e 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -51,7 +51,7 @@ import 
org.apache.kafka.server.group.share.{NoOpShareStatePersister, Persister}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
KafkaYammerMetrics}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
-import org.apache.kafka.server.share.ShareSessionCache
+import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6e121bd8170..7a7ae151189 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -76,7 +76,8 @@ import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, 
RequestLocal}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, 
IBP_2_3_IV0}
 import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
ShareAcknowledgementBatch, ShareFetchContext}
+import org.apache.kafka.server.share.context.ShareFetchContext
+import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
ShareAcknowledgementBatch}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, 
FetchParams, FetchPartitionData}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 9494f16c2d7..05c6b6c7264 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -53,13 +53,13 @@ import 
org.apache.kafka.server.group.share.NoOpShareStatePersister;
 import org.apache.kafka.server.group.share.Persister;
 import org.apache.kafka.server.share.CachedSharePartition;
 import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
-import org.apache.kafka.server.share.FinalContext;
 import org.apache.kafka.server.share.ShareAcknowledgementBatch;
-import org.apache.kafka.server.share.ShareFetchContext;
-import org.apache.kafka.server.share.ShareSession;
-import org.apache.kafka.server.share.ShareSessionCache;
-import org.apache.kafka.server.share.ShareSessionContext;
-import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.server.share.context.FinalContext;
+import org.apache.kafka.server.share.context.ShareFetchContext;
+import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.session.ShareSession;
+import org.apache.kafka.server.share.session.ShareSessionCache;
+import org.apache.kafka.server.share.session.ShareSessionKey;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.server.util.timer.SystemTimer;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ee6cecb97c6..22a0140ea2b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -88,8 +88,10 @@ import 
org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal}
 import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, 
ShareSession, ShareSessionContext, ShareSessionKey}
+import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, ShareAcknowledgementBatch}
 import org.apache.kafka.server.quota.ThrottleCallback
+import org.apache.kafka.server.share.context.{FinalContext, 
ShareSessionContext}
+import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
 import org.apache.kafka.server.util.{FutureUtils, MockTime}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, 
FetchPartitionData, LogConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/FinalContext.java 
b/share/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
similarity index 95%
rename from share/src/main/java/org/apache/kafka/server/share/FinalContext.java
rename to 
share/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index 35da343b514..ab767cdb237 100644
--- a/share/src/main/java/org/apache/kafka/server/share/FinalContext.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.context;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java 
b/share/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
similarity index 95%
rename from 
share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
rename to 
share/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 7714b022819..2c472660d49 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -14,13 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.context;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
+import org.apache.kafka.server.share.session.ShareSession;
 
 import java.util.Collection;
 import java.util.Collections;
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java 
b/share/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
similarity index 98%
rename from 
share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
rename to 
share/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index fee03eeb311..1936ec1f0cc 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.context;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
@@ -27,6 +27,9 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
 import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
 import org.apache.kafka.common.requests.ShareFetchResponse;
 import org.apache.kafka.common.requests.ShareRequestMetadata;
+import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
+import org.apache.kafka.server.share.session.ShareSession;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java 
b/share/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
similarity index 97%
rename from share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
rename to 
share/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
index e73ad12e2e8..e6084f8e9b5 100644
--- a/share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
+++ b/share/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
 
 import java.util.Objects;
 
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/ShareSession.java 
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
similarity index 98%
rename from share/src/main/java/org/apache/kafka/server/share/ShareSession.java
rename to 
share/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
index 12124f51168..97e46d62126 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSession.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.requests.ShareFetchRequest;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.share.CachedSharePartition;
 
 import java.util.ArrayList;
 import java.util.Collection;
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java 
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
similarity index 98%
rename from 
share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
rename to 
share/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index 34b15912b00..28df334d0ac 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.requests.ShareRequestMetadata;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.share.CachedSharePartition;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java 
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
similarity index 97%
rename from 
share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
rename to 
share/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
index 0bf56fdeb26..c7709857f5c 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
 
 import org.apache.kafka.common.Uuid;
 
diff --git 
a/share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java 
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
similarity index 98%
rename from 
share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
rename to 
share/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index e0ac30d7c42..7b8d42cd859 100644
--- 
a/share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
+++ 
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.share.CachedSharePartition;
 
 import org.junit.jupiter.api.Test;
 
diff --git 
a/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java 
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
similarity index 97%
rename from 
share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
rename to 
share/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
index 66879633619..a9022bb2fea 100644
--- a/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
+++ 
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;

Reply via email to