This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f7cb66044d Port akka-core#31859: Improve StopShards coordinator
command (#2783)
f7cb66044d is described below
commit f7cb66044dfa881fea11cd5264313cf440ae5bb2
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 24 08:59:23 2026 +0100
Port akka-core#31859: Improve StopShards coordinator command (#2783)
* Initial plan
* Copy akka-core PR #31859: Improve StopShards command in ShardCoordinator
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/ee962896-5c0b-4e9c-8a67-bc38f3364c04
* Fix camelCase naming: startShardregion -> startShardRegion in
StopShardsSpec
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/ee962896-5c0b-4e9c-8a67-bc38f3364c04
* format
* Update
cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/StopShardsSpec.scala
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../protobuf/msg/ClusterShardingMessages.java | 623 ++++++++++++++++++++-
.../main/protobuf/ClusterShardingMessages.proto | 4 +
.../pekko/cluster/sharding/ShardCoordinator.scala | 66 ++-
.../ClusterShardingMessageSerializer.scala | 20 +-
.../pekko/cluster/sharding/StopShardsSpec.scala | 144 +++++
.../ClusterShardingMessageSerializerSpec.scala | 4 +
6 files changed, 835 insertions(+), 26 deletions(-)
diff --git
a/cluster-sharding/src/main/java/org/apache/pekko/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
b/cluster-sharding/src/main/java/org/apache/pekko/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
index 30b850f207..2e9d343cef 100644
---
a/cluster-sharding/src/main/java/org/apache/pekko/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
+++
b/cluster-sharding/src/main/java/org/apache/pekko/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
@@ -14100,6 +14100,581 @@ public final class ClusterShardingMessages extends
org.apache.pekko.protobufv3.i
}
+ public interface StopShardsOrBuilder extends
+ // @@protoc_insertion_point(interface_extends:StopShards)
+ org.apache.pekko.protobufv3.internal.MessageOrBuilder {
+
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return A list containing the shards.
+ */
+ java.util.List<java.lang.String>
+ getShardsList();
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return The count of shards.
+ */
+ int getShardsCount();
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index of the element to return.
+ * @return The shards at the given index.
+ */
+ java.lang.String getShards(int index);
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index of the value to return.
+ * @return The bytes of the shards at the given index.
+ */
+ org.apache.pekko.protobufv3.internal.ByteString
+ getShardsBytes(int index);
+ }
+ /**
+ * Protobuf type {@code StopShards}
+ */
+ public static final class StopShards extends
+ org.apache.pekko.protobufv3.internal.GeneratedMessage implements
+ // @@protoc_insertion_point(message_implements:StopShards)
+ StopShardsOrBuilder {
+ private static final long serialVersionUID = 0L;
+ static {
+
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
+
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
+ /* major= */ 4,
+ /* minor= */ 33,
+ /* patch= */ 0,
+ /* suffix= */ "",
+ "StopShards");
+ }
+ // Use StopShards.newBuilder() to construct.
+ private
StopShards(org.apache.pekko.protobufv3.internal.GeneratedMessage.Builder<?>
builder) {
+ super(builder);
+ }
+ private StopShards() {
+ shards_ =
+ org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList();
+ }
+
+ public static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
+ getDescriptor() {
+ return
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StopShards_descriptor;
+ }
+
+ @java.lang.Override
+ protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StopShards_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.class,
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.Builder.class);
+ }
+
+ public static final int SHARDS_FIELD_NUMBER = 1;
+ @SuppressWarnings("serial")
+ private org.apache.pekko.protobufv3.internal.LazyStringArrayList shards_ =
+ org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList();
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return A list containing the shards.
+ */
+ public org.apache.pekko.protobufv3.internal.ProtocolStringList
+ getShardsList() {
+ return shards_;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return The count of shards.
+ */
+ public int getShardsCount() {
+ return shards_.size();
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index of the element to return.
+ * @return The shards at the given index.
+ */
+ public java.lang.String getShards(int index) {
+ return shards_.get(index);
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index of the value to return.
+ * @return The bytes of the shards at the given index.
+ */
+ public org.apache.pekko.protobufv3.internal.ByteString
+ getShardsBytes(int index) {
+ return shards_.getByteString(index);
+ }
+
+ private byte memoizedIsInitialized = -1;
+ @java.lang.Override
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized == 1) return true;
+ if (isInitialized == 0) return false;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ @java.lang.Override
+ public void writeTo(org.apache.pekko.protobufv3.internal.CodedOutputStream
output)
+ throws java.io.IOException {
+ for (int i = 0; i < shards_.size(); i++) {
+
org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 1,
shards_.getRaw(i));
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ @java.lang.Override
+ public int getSerializedSize() {
+ int size = memoizedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ {
+ int dataSize = 0;
+ for (int i = 0; i < shards_.size(); i++) {
+ dataSize += computeStringSizeNoTag(shards_.getRaw(i));
+ }
+ size += dataSize;
+ size += 1 * getShardsList().size();
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSize = size;
+ return size;
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards))
{
+ return super.equals(obj);
+ }
+
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
other =
(org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards)
obj;
+
+ if (!getShardsList()
+ .equals(other.getShardsList())) return false;
+ if (!getUnknownFields().equals(other.getUnknownFields())) return false;
+ return true;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptor().hashCode();
+ if (getShardsCount() > 0) {
+ hash = (37 * hash) + SHARDS_FIELD_NUMBER;
+ hash = (53 * hash) + getShardsList().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ java.nio.ByteBuffer data)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ java.nio.ByteBuffer data,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ org.apache.pekko.protobufv3.internal.ByteString data)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ org.apache.pekko.protobufv3.internal.ByteString data,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(byte[] data)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ byte[] data,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return org.apache.pekko.protobufv3.internal.GeneratedMessage
+ .parseWithIOException(PARSER, input);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ java.io.InputStream input,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return org.apache.pekko.protobufv3.internal.GeneratedMessage
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return org.apache.pekko.protobufv3.internal.GeneratedMessage
+ .parseDelimitedWithIOException(PARSER, input);
+ }
+
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseDelimitedFrom(
+ java.io.InputStream input,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return org.apache.pekko.protobufv3.internal.GeneratedMessage
+ .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ org.apache.pekko.protobufv3.internal.CodedInputStream input)
+ throws java.io.IOException {
+ return org.apache.pekko.protobufv3.internal.GeneratedMessage
+ .parseWithIOException(PARSER, input);
+ }
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
parseFrom(
+ org.apache.pekko.protobufv3.internal.CodedInputStream input,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ return org.apache.pekko.protobufv3.internal.GeneratedMessage
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+
+ @java.lang.Override
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder() {
+ return DEFAULT_INSTANCE.toBuilder();
+ }
+ public static Builder
newBuilder(org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
prototype) {
+ return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+ }
+ @java.lang.Override
+ public Builder toBuilder() {
+ return this == DEFAULT_INSTANCE
+ ? new Builder() : new Builder().mergeFrom(this);
+ }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ org.apache.pekko.protobufv3.internal.GeneratedMessage.BuilderParent
parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code StopShards}
+ */
+ public static final class Builder extends
+ org.apache.pekko.protobufv3.internal.GeneratedMessage.Builder<Builder>
implements
+ // @@protoc_insertion_point(builder_implements:StopShards)
+
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShardsOrBuilder
{
+ public static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
+ getDescriptor() {
+ return
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StopShards_descriptor;
+ }
+
+ @java.lang.Override
+ protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StopShards_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.class,
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.Builder.class);
+ }
+
+ // Construct using
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.newBuilder()
+ private Builder() {
+
+ }
+
+ private Builder(
+ org.apache.pekko.protobufv3.internal.GeneratedMessage.BuilderParent
parent) {
+ super(parent);
+
+ }
+ @java.lang.Override
+ public Builder clear() {
+ super.clear();
+ bitField0_ = 0;
+ shards_ =
+
org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList();
+ return this;
+ }
+
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
+ getDescriptorForType() {
+ return
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StopShards_descriptor;
+ }
+
+ @java.lang.Override
+ public
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
getDefaultInstanceForType() {
+ return
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.getDefaultInstance();
+ }
+
+ @java.lang.Override
+ public
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
build() {
+
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ @java.lang.Override
+ public
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
buildPartial() {
+
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
result = new
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards(this);
+ if (bitField0_ != 0) { buildPartial0(result); }
+ onBuilt();
+ return result;
+ }
+
+ private void
buildPartial0(org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
result) {
+ int from_bitField0_ = bitField0_;
+ if (((from_bitField0_ & 0x00000001) != 0)) {
+ shards_.makeImmutable();
+ result.shards_ = shards_;
+ }
+ }
+
+ @java.lang.Override
+ public Builder mergeFrom(org.apache.pekko.protobufv3.internal.Message
other) {
+ if (other instanceof
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards)
{
+ return
mergeFrom((org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder
mergeFrom(org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
other) {
+ if (other ==
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards.getDefaultInstance())
return this;
+ if (!other.shards_.isEmpty()) {
+ if (shards_.isEmpty()) {
+ shards_ = other.shards_;
+ bitField0_ |= 0x00000001;
+ } else {
+ ensureShardsIsMutable();
+ shards_.addAll(other.shards_);
+ }
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ onChanged();
+ return this;
+ }
+
+ @java.lang.Override
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ @java.lang.Override
+ public Builder mergeFrom(
+ org.apache.pekko.protobufv3.internal.CodedInputStream input,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws java.io.IOException {
+ if (extensionRegistry == null) {
+ throw new java.lang.NullPointerException();
+ }
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+ org.apache.pekko.protobufv3.internal.ByteString bs =
input.readBytes();
+ ensureShardsIsMutable();
+ shards_.add(bs);
+ break;
+ } // case 10
+ default: {
+ if (!super.parseUnknownField(input, extensionRegistry, tag)) {
+ done = true; // was an endgroup tag
+ }
+ break;
+ } // default:
+ } // switch (tag)
+ } // while (!done)
+ } catch
(org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException e) {
+ throw e.unwrapIOException();
+ } finally {
+ onChanged();
+ } // finally
+ return this;
+ }
+ private int bitField0_;
+
+ private org.apache.pekko.protobufv3.internal.LazyStringArrayList shards_
=
+ org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList();
+ private void ensureShardsIsMutable() {
+ if (!shards_.isModifiable()) {
+ shards_ = new
org.apache.pekko.protobufv3.internal.LazyStringArrayList(shards_);
+ }
+ bitField0_ |= 0x00000001;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return A list containing the shards.
+ */
+ public org.apache.pekko.protobufv3.internal.ProtocolStringList
+ getShardsList() {
+ shards_.makeImmutable();
+ return shards_;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return The count of shards.
+ */
+ public int getShardsCount() {
+ return shards_.size();
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index of the element to return.
+ * @return The shards at the given index.
+ */
+ public java.lang.String getShards(int index) {
+ return shards_.get(index);
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index of the value to return.
+ * @return The bytes of the shards at the given index.
+ */
+ public org.apache.pekko.protobufv3.internal.ByteString
+ getShardsBytes(int index) {
+ return shards_.getByteString(index);
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param index The index to set the value at.
+ * @param value The shards to set.
+ * @return This builder for chaining.
+ */
+ public Builder setShards(
+ int index, java.lang.String value) {
+ if (value == null) { throw new NullPointerException(); }
+ ensureShardsIsMutable();
+ shards_.set(index, value);
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param value The shards to add.
+ * @return This builder for chaining.
+ */
+ public Builder addShards(
+ java.lang.String value) {
+ if (value == null) { throw new NullPointerException(); }
+ ensureShardsIsMutable();
+ shards_.add(value);
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param values The shards to add.
+ * @return This builder for chaining.
+ */
+ public Builder addAllShards(
+ java.lang.Iterable<java.lang.String> values) {
+ ensureShardsIsMutable();
+
org.apache.pekko.protobufv3.internal.AbstractMessageLite.Builder.addAll(
+ values, shards_);
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearShards() {
+ shards_ =
+ org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated string shards = 1;</code>
+ * @param value The bytes of the shards to add.
+ * @return This builder for chaining.
+ */
+ public Builder addShardsBytes(
+ org.apache.pekko.protobufv3.internal.ByteString value) {
+ if (value == null) { throw new NullPointerException(); }
+ ensureShardsIsMutable();
+ shards_.add(value);
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:StopShards)
+ }
+
+ // @@protoc_insertion_point(class_scope:StopShards)
+ private static final
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
DEFAULT_INSTANCE;
+ static {
+ DEFAULT_INSTANCE = new
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards();
+ }
+
+ public static
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
getDefaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+
+ private static final
org.apache.pekko.protobufv3.internal.Parser<StopShards>
+ PARSER = new
org.apache.pekko.protobufv3.internal.AbstractParser<StopShards>() {
+ @java.lang.Override
+ public StopShards parsePartialFrom(
+ org.apache.pekko.protobufv3.internal.CodedInputStream input,
+ org.apache.pekko.protobufv3.internal.ExtensionRegistryLite
extensionRegistry)
+ throws
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException {
+ Builder builder = newBuilder();
+ try {
+ builder.mergeFrom(input, extensionRegistry);
+ } catch
(org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(builder.buildPartial());
+ } catch
(org.apache.pekko.protobufv3.internal.UninitializedMessageException e) {
+ throw
e.asInvalidProtocolBufferException().setUnfinishedMessage(builder.buildPartial());
+ } catch (java.io.IOException e) {
+ throw new
org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException(e)
+ .setUnfinishedMessage(builder.buildPartial());
+ }
+ return builder.buildPartial();
+ }
+ };
+
+ public static org.apache.pekko.protobufv3.internal.Parser<StopShards>
parser() {
+ return PARSER;
+ }
+
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Parser<StopShards>
getParserForType() {
+ return PARSER;
+ }
+
+ @java.lang.Override
+ public
org.apache.pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages.StopShards
getDefaultInstanceForType() {
+ return DEFAULT_INSTANCE;
+ }
+
+ }
+
public interface AddressOrBuilder extends
// @@protoc_insertion_point(interface_extends:Address)
org.apache.pekko.protobufv3.internal.MessageOrBuilder {
@@ -19681,6 +20256,11 @@ public final class ClusterShardingMessages extends
org.apache.pekko.protobufv3.i
private static final
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internal_static_CurrentRegions_fieldAccessorTable;
+ private static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
+ internal_static_StopShards_descriptor;
+ private static final
+ org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
+ internal_static_StopShards_fieldAccessorTable;
private static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
internal_static_Address_descriptor;
private static final
@@ -19751,18 +20331,19 @@ public final class ClusterShardingMessages extends
org.apache.pekko.protobufv3.i
"tatsEntry\"X\n\031ClusterShardingStatsEntry\022\031" +
"\n\007address\030\001 \002(\0132\010.Address\022 \n\005stats\030\002
\002(\013" +
"2\021.ShardRegionStats\"+\n\016CurrentRegions\022\031\n" +
- "\007regions\030\001
\003(\0132\010.Address\"K\n\007Address\022\020\n\010p" +
- "rotocol\030\001 \002(\t\022\016\n\006system\030\002
\002(\t\022\020\n\010hostnam" +
- "e\030\003 \002(\t\022\014\n\004port\030\004
\002(\r\"\037\n\013StartEntity\022\020\n\010" +
- "entityId\030\001 \002(\t\"3\n\016StartEntityAck\022\020\n\010enti" +
- "tyId\030\001 \002(\t\022\017\n\007shardId\030\002
\002(\t\"7\n\021CurrentSh" +
+ "\007regions\030\001 \003(\0132\010.Address\"\034\n\nStopShards\022\016"
+
+ "\n\006shards\030\001
\003(\t\"K\n\007Address\022\020\n\010protocol\030\001 " +
+ "\002(\t\022\016\n\006system\030\002
\002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n" +
+ "\004port\030\004
\002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001" +
+ " \002(\t\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t" +
+ "\022\017\n\007shardId\030\002
\002(\t\"7\n\021CurrentShardState\022\017" +
+ "\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002
\003(\t\"0\n\nSh" +
"ardState\022\017\n\007shardId\030\001
\002(\t\022\021\n\tentityIds\030\002" +
- " \003(\t\"0\n\nShardState\022\017\n\007shardId\030\001
\002(\t\022\021\n\te" +
- "ntityIds\030\002 \003(\t\"F\n\027CurrentShardRegionStat" +
- "e\022\033\n\006shards\030\001
\003(\0132\013.ShardState\022\016\n\006failed" +
- "\030\002 \003(\t\"7\n\024RememberedShardState\022\017\n\007shardI" +
- "d\030\001 \003(\t\022\016\n\006marker\030\002
\001(\010B2\n.org.apache.pe" +
- "kko.cluster.sharding.protobuf.msgH\001"
+ " \003(\t\"F\n\027CurrentShardRegionState\022\033\n\006shard" +
+ "s\030\001 \003(\0132\013.ShardState\022\016\n\006failed\030\002
\003(\t\"7\n\024" +
+ "RememberedShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n" +
+ "\006marker\030\002 \001(\010B2\n.org.apache.pekko.cluste" +
+ "r.sharding.protobuf.msgH\001"
};
descriptor =
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@@ -19888,44 +20469,50 @@ public final class ClusterShardingMessages extends
org.apache.pekko.protobufv3.i
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_CurrentRegions_descriptor,
new java.lang.String[] { "Regions", });
- internal_static_Address_descriptor =
+ internal_static_StopShards_descriptor =
getDescriptor().getMessageType(19);
+ internal_static_StopShards_fieldAccessorTable = new
+ org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
+ internal_static_StopShards_descriptor,
+ new java.lang.String[] { "Shards", });
+ internal_static_Address_descriptor =
+ getDescriptor().getMessageType(20);
internal_static_Address_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_Address_descriptor,
new java.lang.String[] { "Protocol", "System", "Hostname", "Port", });
internal_static_StartEntity_descriptor =
- getDescriptor().getMessageType(20);
+ getDescriptor().getMessageType(21);
internal_static_StartEntity_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_StartEntity_descriptor,
new java.lang.String[] { "EntityId", });
internal_static_StartEntityAck_descriptor =
- getDescriptor().getMessageType(21);
+ getDescriptor().getMessageType(22);
internal_static_StartEntityAck_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_StartEntityAck_descriptor,
new java.lang.String[] { "EntityId", "ShardId", });
internal_static_CurrentShardState_descriptor =
- getDescriptor().getMessageType(22);
+ getDescriptor().getMessageType(23);
internal_static_CurrentShardState_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_CurrentShardState_descriptor,
new java.lang.String[] { "ShardId", "EntityIds", });
internal_static_ShardState_descriptor =
- getDescriptor().getMessageType(23);
+ getDescriptor().getMessageType(24);
internal_static_ShardState_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_ShardState_descriptor,
new java.lang.String[] { "ShardId", "EntityIds", });
internal_static_CurrentShardRegionState_descriptor =
- getDescriptor().getMessageType(24);
+ getDescriptor().getMessageType(25);
internal_static_CurrentShardRegionState_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_CurrentShardRegionState_descriptor,
new java.lang.String[] { "Shards", "Failed", });
internal_static_RememberedShardState_descriptor =
- getDescriptor().getMessageType(25);
+ getDescriptor().getMessageType(26);
internal_static_RememberedShardState_fieldAccessorTable = new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_RememberedShardState_descriptor,
diff --git a/cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
b/cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
index 9ac9243c08..fe650ccd22 100644
--- a/cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
+++ b/cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
@@ -111,6 +111,10 @@ message CurrentRegions {
repeated Address regions = 1;
}
+message StopShards {
+ repeated string shards = 1;
+}
+
/**
* Defines a remote address.
*/
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
index 1729d14e11..538f5cde2d 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala
@@ -541,6 +541,8 @@ object ShardCoordinator {
private final case class DelayedShardRegionTerminated(region: ActorRef)
+ private final case class StopShardTimeout(requestId: java.util.UUID)
+
/**
* Result of `allocateShard` is piped to self with this message.
*/
@@ -1107,14 +1109,64 @@ abstract class ShardCoordinator(
case DelayedShardRegionTerminated(ref) =>
regionTerminated(ref)
- case StopShards(shards) =>
- val requestId = java.util.UUID.randomUUID()
- shards.foreach { shard =>
- waitingForShardsToStop = waitingForShardsToStop.updated(
- shard,
- waitingForShardsToStop.getOrElse(shard, Set.empty) + ((sender(),
requestId)))
+ case StopShards(shardIds) =>
+ if (settings.rememberEntities) {
+ log.error(
+ "{}: Stop shards cannot be combined with remember entities, ignoring
command to stop [{}]",
+ typeName,
+ shardIds.mkString(", "))
+ } else if (state.regions.nonEmpty && !preparingForShutdown) {
+ val requestId = java.util.UUID.randomUUID()
+ val (runningShards, alreadyStoppedShards) =
shardIds.partition(state.shards.contains)
+ alreadyStoppedShards.foreach(shardId => sender() !
ShardStopped(shardId))
+ if (runningShards.nonEmpty) {
+ waitingForShardsToStop =
runningShards.foldLeft(waitingForShardsToStop) {
+ case (acc, shard) =>
+ val newWaiting = acc.get(shard) match {
+ case Some(waiting) => waiting + ((sender(), requestId))
+ case None => Set((sender(), requestId))
+ }
+ acc.updated(shard, newWaiting)
+ }
+ // no need to stop already rebalancing
+ val shardsToStop = runningShards.filter(shard =>
!rebalanceInProgress.contains(shard))
+ log.info(
+ "{}: Explicitly stopping shards [{}] (request id [{}])",
+ typeName,
+ shardsToStop.mkString(", "),
+ requestId)
+ val shardsPerRegion =
+ shardsToStop.flatMap(shardId =>
state.shards.get(shardId).map(region => region -> shardId)).groupBy(_._1)
+ shardsPerRegion.foreach {
+ case (region, shards) => shutdownShards(region, shards.map(_._2))
+ }
+ val timeout = StopShardTimeout(requestId)
+ timers.startSingleTimer(timeout, timeout,
settings.tuningParameters.handOffTimeout)
+ }
+ } else {
+ log.warning(
+ "{}: Explicit stop shards of shards [{}] ignored (no known regions
or sharding shutting down)",
+ typeName,
+ shardIds.mkString(", "))
+ }
+
+ case StopShardTimeout(requestId) =>
+ val timedOutShards = waitingForShardsToStop.collect {
+ case (shard, waiting) if waiting.exists(_._2 == requestId) => shard
+ }
+ if (timedOutShards.nonEmpty) {
+ log.info(
+ "{}: Stop shard request [{}] timed out for shards [{}]",
+ typeName,
+ requestId,
+ timedOutShards.mkString(", "))
+ waitingForShardsToStop =
timedOutShards.foldLeft(waitingForShardsToStop) {
+ case (acc, shard) =>
+ val waiting = acc(shard)
+ if (waiting.size == 1) acc - shard
+ else acc.updated(shard, waiting.filterNot { case (_, id) => id ==
requestId })
+ }
}
- shutdownShards(self, shards.filter(state.shards.contains))
}
def update[E <: DomainEvent](evt: E)(f: E => Unit): Unit
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
index 6951a109a8..2afa4e7303 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
@@ -106,6 +106,8 @@ private[pekko] class ClusterShardingMessageSerializer(val
system: ExtendedActorS
private val EventSourcedRememberShardsMigrationMarkerManifest = "SM"
private val EventSourcedRememberShardsState = "SS"
+ private val StopShardsManifest = "ST"
+
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte]
=> AnyRef](
EntityStateManifest -> entityStateFromBinary,
EntityStartedManifest -> entityStartedFromBinary,
@@ -213,7 +215,8 @@ private[pekko] class ClusterShardingMessageSerializer(val
system: ExtendedActorS
},
EventSourcedRememberShardsState -> { bytes =>
rememberShardsStateFromBinary(bytes)
- })
+ },
+ StopShardsManifest -> stopShardsFromBinary)
override def manifest(obj: AnyRef): String = obj match {
case _: EntityState => EntityStateManifest
@@ -264,6 +267,8 @@ private[pekko] class ClusterShardingMessageSerializer(val
system: ExtendedActorS
case MigrationMarker =>
EventSourcedRememberShardsMigrationMarkerManifest
case _: RememberShardsState => EventSourcedRememberShardsState
+ case _: StopShards => StopShardsManifest
+
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type
${obj.getClass} in [${getClass.getName}]")
}
@@ -318,6 +323,8 @@ private[pekko] class ClusterShardingMessageSerializer(val
system: ExtendedActorS
case MigrationMarker => Array.emptyByteArray
case m: RememberShardsState => rememberShardsStateToProto(m).toByteArray
+ case ss: StopShards => stopShardsToProto(ss).toByteArray
+
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type
${obj.getClass} in [${getClass.getName}]")
}
@@ -342,6 +349,17 @@ private[pekko] class ClusterShardingMessageSerializer(val
system: ExtendedActorS
RememberShardsState(proto.getShardIdList.asScala.toSet, proto.getMarker)
}
+ private def stopShardsFromBinary(bytes: Array[Byte]): StopShards = {
+ val proto = sm.StopShards.parseFrom(bytes)
+ StopShards(proto.getShardsList.asScala.toSet)
+ }
+
+ private def stopShardsToProto(ss: StopShards): sm.StopShards = {
+ val builder = sm.StopShards.newBuilder()
+ builder.addAllShards(ss.shards.asJava)
+ builder.build()
+ }
+
private def coordinatorStateToProto(state: State): sm.CoordinatorState = {
val builder = sm.CoordinatorState.newBuilder()
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/StopShardsSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/StopShardsSpec.scala
new file mode 100644
index 0000000000..181eae6cc1
--- /dev/null
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/StopShardsSpec.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2019-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.sharding
+
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.Actor
+import pekko.actor.ActorLogging
+import pekko.actor.ActorRef
+import pekko.actor.ActorSystem
+import pekko.actor.Props
+import pekko.actor.Terminated
+import pekko.cluster.Cluster
+import pekko.cluster.MemberStatus
+import pekko.cluster.sharding.ShardCoordinator.Internal.ShardStopped
+import pekko.cluster.sharding.ShardRegion.CurrentRegions
+import pekko.cluster.sharding.ShardRegion.GetCurrentRegions
+import pekko.testkit.DeadLettersFilter
+import pekko.testkit.PekkoSpec
+import pekko.testkit.TestEvent.Mute
+import pekko.testkit.TestProbe
+import pekko.testkit.WithLogCapturing
+
+import com.typesafe.config.ConfigFactory
+
+object StopShardsSpec {
+
+ def config =
+ ConfigFactory.parseString("""
+ pekko.loglevel = DEBUG
+ pekko.loggers =
["org.apache.pekko.testkit.SilenceAllTestEventListener"]
+ pekko.actor.provider = "cluster"
+ pekko.remote.artery.canonical.port = 0
+ pekko.test.single-expect-default = 5 s
+ pekko.cluster.sharding.distributed-data.durable.keys = []
+ pekko.cluster.sharding.remember-entities = off
+ pekko.cluster.downing-provider-class =
org.apache.pekko.cluster.testkit.AutoDowning
+ pekko.cluster.jmx.enabled = off
+ pekko.cluster.sharding.verbose-debug-logging = on
+ pekko.cluster.sharding.fail-on-invalid-entity-state-transition = on
+ pekko.remote.artery.canonical.hostname = "127.0.0.1"
+ """)
+
+ val shardTypeName = "stopping-entities"
+
+ val numberOfShards = 3
+
+ val extractEntityId: ShardRegion.ExtractEntityId = {
+ case msg: Int => (msg.toString, msg)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ val extractShardId: ShardRegion.ExtractShardId = {
+ case msg: Int => (msg % 10).toString
+ case ShardRegion.StartEntity(id) => (id.toLong % numberOfShards).toString
+ case _ => throw new IllegalArgumentException()
+ }
+
+ class EntityActor extends Actor with ActorLogging {
+ override def receive: Receive = {
+ case _ =>
+ log.debug("ping")
+ sender() ! context.self
+ }
+ }
+}
+
+class StopShardsSpec extends PekkoSpec(StopShardsSpec.config) with
WithLogCapturing {
+ import StopShardsSpec._
+
+ // mute logging of deadLetters
+ system.eventStream.publish(Mute(DeadLettersFilter[Any]))
+
+ private val sysA = system
+ private val sysB = ActorSystem(system.name, system.settings.config)
+
+ private val pA = TestProbe()(sysA)
+ private val pB = TestProbe()(sysB)
+
+ private val regionA = startShardRegion(sysA)
+ private val regionB = startShardRegion(sysB)
+
+ override protected def beforeTermination(): Unit = {
+ shutdown(sysB)
+ super.beforeTermination()
+ }
+
+ "The StopShards command" must {
+
+ "form a cluster" in {
+ Cluster(sysA).join(Cluster(sysA).selfAddress) // coordinator on A
+ awaitAssert(Cluster(sysA).selfMember.status shouldEqual MemberStatus.Up,
3.seconds)
+
+ Cluster(sysB).join(Cluster(sysA).selfAddress)
+ awaitAssert(Cluster(sysB).selfMember.status shouldEqual MemberStatus.Up,
3.seconds)
+
+ // wait for all regions to be registered
+ pA.awaitAssert({
+ regionA.tell(GetCurrentRegions, pA.ref)
+ pA.expectMsgType[CurrentRegions].regions should have size 2
+ }, 10.seconds)
+ }
+
+ "start entities in a few shards, then stop the shards" in {
+
+ val allShards = (1 to 10).map { i =>
+ regionA.tell(i, pA.ref)
+ val entityRef = pA.expectMsgType[ActorRef]
+ pA.watch(entityRef) // so we can verify terminated later
+ extractShardId(i)
+ }.toSet
+
+ regionB.tell(ShardCoordinator.Internal.StopShards(allShards), pB.ref)
+ (1 to allShards.size).foreach { _ =>
+ // one ack for each shard stopped
+ pB.expectMsgType[ShardStopped].shard
+ }
+
+ // all entities stopped
+ (1 to 10).foreach(_ => pA.expectMsgType[Terminated])
+ }
+ }
+
+ def startShardRegion(sys: ActorSystem): ActorRef =
+ ClusterSharding(sys).start(
+ shardTypeName,
+ Props[EntityActor](),
+ ClusterShardingSettings(system),
+ extractEntityId,
+ extractShardId)
+
+}
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
index 63a70b65a2..e4b5a790f5 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
@@ -81,6 +81,10 @@ class ClusterShardingMessageSerializerSpec extends PekkoSpec
{
checkSerialization(GracefulShutdownReq(region1))
}
+ "be able to serialize StopShards" in {
+ checkSerialization(StopShards(Set("a", "b", "c")))
+ }
+
"be able to serialize PersistentShard snapshot state" in {
checkSerialization(EventSourcedRememberEntitiesShardStore.State(Set("e1", "e2",
"e3")))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]