This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 300e825be74 MINOR Refactored share module classes (#17178)
300e825be74 is described below
commit 300e825be748e6aa55f86f6a6fa588398f1fcc73
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Sep 12 18:32:15 2024 +0100
MINOR Refactored share module classes (#17178)
Reviewers: Andrew Schofield <[email protected]>, David Arthur
<[email protected]>
---
checkstyle/import-control-share.xml | 6 ++++++
core/src/main/java/kafka/server/share/SharePartition.java | 7 ++++++-
.../main/java/kafka/server/share/SharePartitionManager.java | 12 ++++++------
core/src/main/scala/kafka/server/BrokerServer.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++-
.../java/kafka/server/share/SharePartitionManagerTest.java | 12 ++++++------
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 4 +++-
.../kafka/server/share/{ => context}/FinalContext.java | 3 ++-
.../kafka/server/share/{ => context}/ShareFetchContext.java | 4 +++-
.../server/share/{ => context}/ShareSessionContext.java | 5 ++++-
.../apache/kafka/server/share/{ => session}/LastUsedKey.java | 2 +-
.../kafka/server/share/{ => session}/ShareSession.java | 3 ++-
.../kafka/server/share/{ => session}/ShareSessionCache.java | 3 ++-
.../kafka/server/share/{ => session}/ShareSessionKey.java | 2 +-
.../server/share/{ => session}/ShareSessionCacheTest.java | 3 ++-
.../kafka/server/share/{ => session}/ShareSessionTest.java | 2 +-
16 files changed, 48 insertions(+), 25 deletions(-)
diff --git a/checkstyle/import-control-share.xml
b/checkstyle/import-control-share.xml
index 1356966e4c6..531952b9118 100644
--- a/checkstyle/import-control-share.xml
+++ b/checkstyle/import-control-share.xml
@@ -39,4 +39,10 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
+ <subpackage name="server">
+ <subpackage name="share">
+ <allow pkg="org.apache.kafka.server.share" />
+ </subpackage>
+ </subpackage>
+
</import-control>
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 7fc3f8d30b2..274b934a088 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1018,7 +1018,12 @@ public class SharePartition {
}
private void completeInitializationWithException(CompletableFuture<Void>
future, Throwable exception) {
- partitionState = SharePartitionState.FAILED;
+ lock.writeLock().lock();
+ try {
+ partitionState = SharePartitionState.FAILED;
+ } finally {
+ lock.writeLock().unlock();
+ }
future.completeExceptionally(exception);
}
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index d6d43170215..2665d7dd8b4 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -40,13 +40,13 @@ import
org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
-import org.apache.kafka.server.share.FinalContext;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
-import org.apache.kafka.server.share.ShareFetchContext;
-import org.apache.kafka.server.share.ShareSession;
-import org.apache.kafka.server.share.ShareSessionCache;
-import org.apache.kafka.server.share.ShareSessionContext;
-import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.server.share.context.FinalContext;
+import org.apache.kafka.server.share.context.ShareFetchContext;
+import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.session.ShareSession;
+import org.apache.kafka.server.share.session.ShareSessionCache;
+import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7b913b7f48..34ec3398b8e 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -51,7 +51,7 @@ import
org.apache.kafka.server.group.share.{NoOpShareStatePersister, Persister}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin,
KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
-import org.apache.kafka.server.share.ShareSessionCache
+import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6e121bd8170..7a7ae151189 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -76,7 +76,8 @@ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion,
RequestLocal}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0,
IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
-import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
ShareAcknowledgementBatch, ShareFetchContext}
+import org.apache.kafka.server.share.context.ShareFetchContext
+import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
ShareAcknowledgementBatch}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation,
FetchParams, FetchPartitionData}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 9494f16c2d7..05c6b6c7264 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -53,13 +53,13 @@ import
org.apache.kafka.server.group.share.NoOpShareStatePersister;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
-import org.apache.kafka.server.share.FinalContext;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
-import org.apache.kafka.server.share.ShareFetchContext;
-import org.apache.kafka.server.share.ShareSession;
-import org.apache.kafka.server.share.ShareSessionCache;
-import org.apache.kafka.server.share.ShareSessionContext;
-import org.apache.kafka.server.share.ShareSessionKey;
+import org.apache.kafka.server.share.context.FinalContext;
+import org.apache.kafka.server.share.context.ShareFetchContext;
+import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.session.ShareSession;
+import org.apache.kafka.server.share.session.ShareSessionCache;
+import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ee6cecb97c6..22a0140ea2b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -88,8 +88,10 @@ import
org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_I
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs,
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch,
ShareSession, ShareSessionContext, ShareSessionKey}
+import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, ShareAcknowledgementBatch}
import org.apache.kafka.server.quota.ThrottleCallback
+import org.apache.kafka.server.share.context.{FinalContext,
ShareSessionContext}
+import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams,
FetchPartitionData, LogConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
diff --git
a/share/src/main/java/org/apache/kafka/server/share/FinalContext.java
b/share/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
similarity index 95%
rename from share/src/main/java/org/apache/kafka/server/share/FinalContext.java
rename to
share/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index 35da343b514..ab767cdb237 100644
--- a/share/src/main/java/org/apache/kafka/server/share/FinalContext.java
+++
b/share/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.context;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
b/share/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
similarity index 95%
rename from
share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
rename to
share/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 7714b022819..2c472660d49 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareFetchContext.java
+++
b/share/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.context;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
+import org.apache.kafka.server.share.session.ShareSession;
import java.util.Collection;
import java.util.Collections;
diff --git
a/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
b/share/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
similarity index 98%
rename from
share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
rename to
share/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index fee03eeb311..1936ec1f0cc 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSessionContext.java
+++
b/share/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.context;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@@ -27,6 +27,9 @@ import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
+import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
+import org.apache.kafka.server.share.session.ShareSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
b/share/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
similarity index 97%
rename from share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
rename to
share/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
index e73ad12e2e8..e6084f8e9b5 100644
--- a/share/src/main/java/org/apache/kafka/server/share/LastUsedKey.java
+++ b/share/src/main/java/org/apache/kafka/server/share/session/LastUsedKey.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
import java.util.Objects;
diff --git
a/share/src/main/java/org/apache/kafka/server/share/ShareSession.java
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
similarity index 98%
rename from share/src/main/java/org/apache/kafka/server/share/ShareSession.java
rename to
share/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
index 12124f51168..97e46d62126 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSession.java
+++
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.share.CachedSharePartition;
import java.util.ArrayList;
import java.util.Collection;
diff --git
a/share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
similarity index 98%
rename from
share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
rename to
share/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index 34b15912b00..28df334d0ac 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSessionCache.java
+++
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.share.CachedSharePartition;
import java.util.HashMap;
import java.util.Map;
diff --git
a/share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
similarity index 97%
rename from
share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
rename to
share/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
index 0bf56fdeb26..c7709857f5c 100644
--- a/share/src/main/java/org/apache/kafka/server/share/ShareSessionKey.java
+++
b/share/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
diff --git
a/share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
similarity index 98%
rename from
share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
rename to
share/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index e0ac30d7c42..7b8d42cd859 100644
---
a/share/src/test/java/org/apache/kafka/server/share/ShareSessionCacheTest.java
+++
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
+import org.apache.kafka.server.share.CachedSharePartition;
import org.junit.jupiter.api.Test;
diff --git
a/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
similarity index 97%
rename from
share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
rename to
share/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
index 66879633619..a9022bb2fea 100644
--- a/share/src/test/java/org/apache/kafka/server/share/ShareSessionTest.java
+++
b/share/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.server.share;
+package org.apache.kafka.server.share.session;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;