This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a376fbcdb8 [ISSUE #7634] Introduce controllableOffset to prevent
unnecessary suspension when OFFSET_ILLEGAL (#7635)
a376fbcdb8 is described below
commit a376fbcdb82e818cfa239da677669f1118e4c40f
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Dec 12 16:52:12 2023 +0800
[ISSUE #7634] Introduce controllableOffset to prevent unnecessary
suspension when OFFSET_ILLEGAL (#7635)
* Add controllableOffset to prevent unnecessary suspension when
OFFSET_ILLEGAL
---
.../client/consumer/store/ControllableOffset.java | 115 +++++++++++++++++++++
.../consumer/store/LocalFileOffsetStore.java | 33 +++---
.../client/consumer/store/OffsetStore.java | 8 ++
.../consumer/store/RemoteBrokerOffsetStore.java | 45 ++++----
.../impl/consumer/DefaultMQPushConsumerImpl.java | 13 ++-
.../client/impl/consumer/PullMessageService.java | 8 ++
.../store/RemoteBrokerOffsetStoreTest.java | 32 ++++++
7 files changed, 218 insertions(+), 36 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java
new file mode 100644
index 0000000000..9db4bd2e2a
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ControllableOffset.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer.store;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The ControllableOffset class encapsulates a thread-safe offset value that
can be
+ * updated atomically. Additionally, this class allows for the offset to be
"frozen,"
+ * which prevents further updates after the freeze operation has been
performed.
+ * <p>
+ * Concurrency Scenarios:
+ * If {@code updateAndFreeze} is called before any {@code update} operations,
it sets
+ * {@code allowToUpdate} to false and updates the offset to the target value
specified.
+ * After this operation, further invocations of {@code update} will not affect
the offset,
+ * as it is considered frozen.
+ * <p>
+ * If {@code update} is in progress while {@code updateAndFreeze} is invoked
concurrently,
+ * the final outcome depends on the sequence of operations:
+ * 1. If {@code update}'s atomic update operation completes before {@code
updateAndFreeze},
+ * the latter will overwrite the offset and set {@code allowToUpdate} to false,
+ * preventing any further updates.
+ * 2. If {@code updateAndFreeze} executes before the {@code update} finalizes
its operation,
+ * the ongoing {@code update} will not proceed with its changes. The {@link
AtomicLong#getAndUpdate}
+ * method used in both operations ensures atomicity and respects the final
state imposed by
+ * {@code updateAndFreeze}, even if the {@code update} function has already
begun.
+ * <p>
+ * In essence, once the {@code updateAndFreeze} operation is executed, the
offset value remains
+ * immutable to any subsequent {@code update} calls due to the immediate
visibility of the
+ * {@code allowToUpdate} state change, courtesy of its volatile nature.
+ * <p>
+ * The combination of an AtomicLong for the offset value and a volatile
boolean flag for update
+ * control provides a reliable mechanism for managing offset values in
concurrent environments.
+ */
+public class ControllableOffset {
+ // Holds the current offset value in an atomic way.
+ private final AtomicLong value;
+ // Controls whether updates to the offset are allowed.
+ private volatile boolean allowToUpdate;
+
+ public ControllableOffset(long value) {
+ this.value = new AtomicLong(value);
+ this.allowToUpdate = true;
+ }
+
+ /**
+ * Attempts to update the offset to the target value. If increaseOnly is
true,
+ * the offset will not be decreased. The update operation is atomic and
thread-safe.
+ * The operation will respect the current allowToUpdate state, and if the
offset
+ * has been frozen by a previous call to {@link #updateAndFreeze(long)},
+ * this method will not update the offset.
+ *
+ * @param target the new target offset value.
+ * @param increaseOnly if true, the offset will only be updated if the
target value
+ * is greater than the current value.
+ */
+ public void update(long target, boolean increaseOnly) {
+ if (allowToUpdate) {
+ value.getAndUpdate(val -> {
+ if (allowToUpdate) {
+ if (increaseOnly) {
+ return Math.max(target, val);
+ } else {
+ return target;
+ }
+ } else {
+ return val;
+ }
+ });
+ }
+ }
+
+ /**
+ * Overloaded method for updating the offset value unconditionally.
+ *
+ * @param target The new target value for the offset.
+ */
+ public void update(long target) {
+ update(target, false);
+ }
+
+ /**
+ * Freezes the offset at the target value provided. Once frozen, the offset
+ * cannot be updated by subsequent calls to {@link #update(long, boolean)}.
+ * This method will set allowToUpdate to false and then update the offset,
+ * ensuring the new value is the final state of the offset.
+ *
+ * @param target the new target offset value to freeze at.
+ */
+ public void updateAndFreeze(long target) {
+ value.getAndUpdate(val -> {
+ allowToUpdate = false;
+ return target;
+ });
+ }
+
+ public long getOffset() {
+ return value.get();
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 832888dbeb..074508c46b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -47,7 +47,7 @@ public class LocalFileOffsetStore implements OffsetStore {
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
- private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
+ private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable =
new ConcurrentHashMap<>();
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String
groupName) {
@@ -63,10 +63,9 @@ public class LocalFileOffsetStore implements OffsetStore {
public void load() throws MQClientException {
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null &&
offsetSerializeWrapper.getOffsetTable() != null) {
- offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
-
for (Entry<MessageQueue, AtomicLong> mqEntry :
offsetSerializeWrapper.getOffsetTable().entrySet()) {
AtomicLong offset = mqEntry.getValue();
+ offsetTable.put(mqEntry.getKey(), new
ControllableOffset(offset.get()));
log.info("load consumer's offset, {} {} {}",
this.groupName,
mqEntry.getKey(),
@@ -78,30 +77,38 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public void updateOffset(MessageQueue mq, long offset, boolean
increaseOnly) {
if (mq != null) {
- AtomicLong offsetOld = this.offsetTable.get(mq);
+ ControllableOffset offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
- offsetOld = this.offsetTable.putIfAbsent(mq, new
AtomicLong(offset));
+ offsetOld = this.offsetTable.putIfAbsent(mq, new
ControllableOffset(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
+ offsetOld.update(offset, true);
} else {
- offsetOld.set(offset);
+ offsetOld.update(offset);
}
}
}
}
+ @Override
+ public void updateAndFreezeOffset(MessageQueue mq, long offset) {
+ if (mq != null) {
+ this.offsetTable.computeIfAbsent(mq, k -> new
ControllableOffset(offset))
+ .updateAndFreeze(offset);
+ }
+ }
+
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
- AtomicLong offset = this.offsetTable.get(mq);
+ ControllableOffset offset = this.offsetTable.get(mq);
if (offset != null) {
- return offset.get();
+ return offset.getOffset();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
@@ -135,9 +142,9 @@ public class LocalFileOffsetStore implements OffsetStore {
return;
OffsetSerializeWrapper offsetSerializeWrapper = new
OffsetSerializeWrapper();
- for (Map.Entry<MessageQueue, AtomicLong> entry :
this.offsetTable.entrySet()) {
+ for (Map.Entry<MessageQueue, ControllableOffset> entry :
this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
- AtomicLong offset = entry.getValue();
+ AtomicLong offset = new
AtomicLong(entry.getValue().getOffset());
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(),
offset);
}
}
@@ -170,12 +177,12 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
Map<MessageQueue, Long> cloneOffsetTable = new
HashMap<>(this.offsetTable.size(), 1);
- for (Map.Entry<MessageQueue, AtomicLong> entry :
this.offsetTable.entrySet()) {
+ for (Map.Entry<MessageQueue, ControllableOffset> entry :
this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
continue;
}
- cloneOffsetTable.put(mq, entry.getValue().get());
+ cloneOffsetTable.put(mq, entry.getValue().getOffset());
}
return cloneOffsetTable;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
index 9deed0e3df..ecceedee17 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
@@ -37,6 +37,14 @@ public interface OffsetStore {
*/
void updateOffset(final MessageQueue mq, final long offset, final boolean
increaseOnly);
+ /**
+ * Update and freeze the message queue to prevent concurrent update action
+ *
+ * @param mq target message queue
+ * @param offset expect update offset
+ */
+ void updateAndFreezeOffset(final MessageQueue mq, final long offset);
+
/**
* Get offset from local storage
*
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 900e822114..83d5061adb 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
@@ -31,11 +30,11 @@ import
org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
* Remote storage implementation
@@ -44,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private final static Logger log =
LoggerFactory.getLogger(RemoteBrokerOffsetStore.class);
private final MQClientInstance mQClientFactory;
private final String groupName;
- private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
+ private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable =
new ConcurrentHashMap<>();
public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String
groupName) {
@@ -59,30 +58,38 @@ public class RemoteBrokerOffsetStore implements OffsetStore
{
@Override
public void updateOffset(MessageQueue mq, long offset, boolean
increaseOnly) {
if (mq != null) {
- AtomicLong offsetOld = this.offsetTable.get(mq);
+ ControllableOffset offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
- offsetOld = this.offsetTable.putIfAbsent(mq, new
AtomicLong(offset));
+ offsetOld = this.offsetTable.putIfAbsent(mq, new
ControllableOffset(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
+ offsetOld.update(offset, true);
} else {
- offsetOld.set(offset);
+ offsetOld.update(offset);
}
}
}
}
+ @Override
+ public void updateAndFreezeOffset(MessageQueue mq, long offset) {
+ if (mq != null) {
+ this.offsetTable.computeIfAbsent(mq, k -> new
ControllableOffset(offset))
+ .updateAndFreeze(offset);
+ }
+ }
+
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
- AtomicLong offset = this.offsetTable.get(mq);
+ ControllableOffset offset = this.offsetTable.get(mq);
if (offset != null) {
- return offset.get();
+ return offset.getOffset();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
@@ -118,18 +125,18 @@ public class RemoteBrokerOffsetStore implements
OffsetStore {
final HashSet<MessageQueue> unusedMQ = new HashSet<>();
- for (Map.Entry<MessageQueue, AtomicLong> entry :
this.offsetTable.entrySet()) {
+ for (Map.Entry<MessageQueue, ControllableOffset> entry :
this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
- AtomicLong offset = entry.getValue();
+ ControllableOffset offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
- this.updateConsumeOffsetToBroker(mq, offset.get());
+ this.updateConsumeOffsetToBroker(mq,
offset.getOffset());
log.info("[persistAll] Group: {} ClientId: {}
updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
- offset.get());
+ offset.getOffset());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " +
mq.toString(), e);
}
@@ -149,15 +156,15 @@ public class RemoteBrokerOffsetStore implements
OffsetStore {
@Override
public void persist(MessageQueue mq) {
- AtomicLong offset = this.offsetTable.get(mq);
+ ControllableOffset offset = this.offsetTable.get(mq);
if (offset != null) {
try {
- this.updateConsumeOffsetToBroker(mq, offset.get());
+ this.updateConsumeOffsetToBroker(mq, offset.getOffset());
log.info("[persist] Group: {} ClientId: {}
updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
- offset.get());
+ offset.getOffset());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " +
mq.toString(), e);
}
@@ -175,12 +182,12 @@ public class RemoteBrokerOffsetStore implements
OffsetStore {
@Override
public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
Map<MessageQueue, Long> cloneOffsetTable = new
HashMap<>(this.offsetTable.size(), 1);
- for (Map.Entry<MessageQueue, AtomicLong> entry :
this.offsetTable.entrySet()) {
+ for (Map.Entry<MessageQueue, ControllableOffset> entry :
this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
continue;
}
- cloneOffsetTable.put(mq, entry.getValue().get());
+ cloneOffsetTable.put(mq, entry.getValue().getOffset());
}
return cloneOffsetTable;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index cfb89b5c88..d2a362ba56 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -404,16 +404,17 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
-
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+ DefaultMQPushConsumerImpl.this.executeTask(new
Runnable() {
@Override
public void run() {
try {
-
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(),
false);
+
DefaultMQPushConsumerImpl.this.offsetStore.updateAndFreezeOffset(pullRequest.getMessageQueue(),
+ pullRequest.getNextOffset());
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
+ // removeProcessQueue will also remove
offset to cancel the frozen status.
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset,
{}", pullRequest);
@@ -421,7 +422,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
log.error("executeTaskLater
Exception", e);
}
}
- }, 10000);
+ });
break;
default:
break;
@@ -705,6 +706,10 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
this.mQClientFactory.getPullMessageService().executeTaskLater(r,
timeDelay);
}
+ public void executeTask(final Runnable r) {
+ this.mQClientFactory.getPullMessageService().executeTask(r);
+ }
+
public QueryResult queryMessage(String topic, String key, int maxNum, long
begin, long end)
throws MQClientException, InterruptedException {
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key,
maxNum, begin, end);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index b5e6f9f790..ec6ede6bde 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -90,6 +90,14 @@ public class PullMessageService extends ServiceThread {
}
}
+ public void executeTask(final Runnable r) {
+ if (!isStopped()) {
+ this.scheduledExecutorService.execute(r);
+ } else {
+ logger.warn("PullMessageServiceScheduledThread has shutdown");
+ }
+ }
+
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 33ea2b04b8..ba6911e3e8 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -81,6 +81,38 @@ public class RemoteBrokerOffsetStoreTest {
assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
}
+ @Test
+ public void testUpdateAndFreezeOffset() throws Exception {
+ OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory,
group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+
+ offsetStore.updateAndFreezeOffset(messageQueue, 1024);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.updateOffset(messageQueue, 1023, false);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.updateOffset(messageQueue, 1022, true);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+ }
+
+ @Test
+ public void testUpdateAndFreezeOffsetWithRemove() throws Exception {
+ OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory,
group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+
+ offsetStore.updateAndFreezeOffset(messageQueue, 1024);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.updateOffset(messageQueue, 1023, false);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.removeOffset(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+ offsetStore.updateOffset(messageQueue, 1023, false);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+ }
+
@Test
public void testReadOffset_WithException() throws Exception {
OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory,
group);