This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1a664f5d68 KAFKA-19858 Set default min.insync.replicas=2 for 
__remote_log_metadata topic to prevent data loss (KIP-1235) (#20811)
f1a664f5d68 is described below

commit f1a664f5d68762334d863f717cf7a95bcf0135a3
Author: Jian <[email protected]>
AuthorDate: Mon Dec 29 11:12:25 2025 +0800

    KAFKA-19858 Set default min.insync.replicas=2 for __remote_log_metadata 
topic to prevent data loss (KIP-1235) (#20811)
    
    [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw)
    
    The __remote_log_metadata internal topic currently lacks a configurable
    min.insync.replicas setting, relying on the broker-level default
    (typically 1) when the factor is 3 as default. This creates a data loss
    risk in production environments, as writes may be acknowledged by only a
    single replica.
    
    What's more, this is inconsistent with some other cases such as
    __transaction_state, which explicitly sets min.insync.replicas=2 via the
    transaction.state.log.min.isr broker configuration. Both topics store
    critical metadata and should have equivalent durability guarantees.
    
    Note:
    ```
    public static final String
    REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP =
    "rlmm.config.remote.log.metadata.topic.replication.factor";
    public static final short
    DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
    ```
    
    Reviewers: Kamal Chandraprakash <[email protected]>,
     Chia-Ping Tsai <[email protected]>
---
 docs/getting-started/upgrade.md                    |  2 +-
 docs/operations/tiered-storage.md                  |  6 ++-
 .../TopicBasedRemoteLogMetadataManager.java        |  1 +
 .../TopicBasedRemoteLogMetadataManagerConfig.java  | 12 +++++
 .../storage/RemoteLogMetadataManagerTestUtils.java |  2 +-
 ...picBasedRemoteLogMetadataManagerConfigTest.java | 23 +++++++++
 .../TopicBasedRemoteLogMetadataManagerTest.java    | 56 ++++++++++++++++++++++
 7 files changed, 99 insertions(+), 3 deletions(-)

diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 1adf5110659..3ae4c68ce6f 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -33,7 +33,7 @@ type: docs
 ### Notable changes in 4.3.0
 
   * Two new configs have been introduced: 
`group.coordinator.cached.buffer.max.bytes` and 
`share.coordinator.cached.buffer.max.bytes`. They allow the respective 
coordinators to set the maximum buffer size retained for reuse. For further 
details, please refer to 
[KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg). 
-
+  * The new config have been introduced: `remote.log.metadata.topic.min.isr` 
with 2 as default value. You can correct the min.insync.replicas for the 
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further 
details, please refer to 
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
 
 
 ## Upgrading to 4.2.0
diff --git a/docs/operations/tiered-storage.md 
b/docs/operations/tiered-storage.md
index ca6376be566..5da14595279 100644
--- a/docs/operations/tiered-storage.md
+++ b/docs/operations/tiered-storage.md
@@ -89,9 +89,13 @@ After build successfully, there should be a 
`kafka-storage-x.x.x-test.jar` file
     # Note, please make sure the brokers need to have access to this directory
     rsm.config.dir=/tmp/kafka-remote-storage
     
-    # This needs to be changed if number of brokers in the cluster is more 
than 1
+    # For single broker cluster, set this to 1. Default is 3 for clusters with 
3 or more brokers.
     rlmm.config.remote.log.metadata.topic.replication.factor=1
     
+    # The minimum number of replicas that must acknowledge a write to remote 
log metadata topic.
+    # Default value is 2. For single broker cluster (replication factor = 1), 
set this to 1.
+    rlmm.config.remote.log.metadata.topic.min.isr=1
+    
     # Try to speed up the log retention check interval for testing
     log.retention.check.interval.ms=1000
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 1b2b459cb8d..cd2ed674216 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -434,6 +434,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
BrokerReadyCallback,
         topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, 
Long.toString(rlmmConfig.metadataTopicRetentionMs()));
         topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
         topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"false");
+        topicConfigs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
Short.toString(rlmmConfig.metadataTopicMinIsr()));
         return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
                             rlmmConfig.metadataTopicPartitionsCount(),
                             
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 721d58e34d8..52f934ce299 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -46,6 +46,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
     public static final String 
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = 
"remote.log.metadata.topic.replication.factor";
     public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = 
"remote.log.metadata.topic.num.partitions";
     public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = 
"remote.log.metadata.topic.retention.ms";
+    public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP = 
"remote.log.metadata.topic.min.isr";
     public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = 
"remote.log.metadata.consume.wait.ms";
     public static final String 
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = 
"remote.log.metadata.initialization.retry.max.timeout.ms";
     public static final String 
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = 
"remote.log.metadata.initialization.retry.interval.ms";
@@ -53,6 +54,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
     public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
     public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS = 
-1L;
     public static final short 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
+    public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR = 2;
     public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 
60 * 1000L;
     public static final long 
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 
1000L;
     public static final long 
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 100L;
@@ -63,6 +65,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
             "Default: -1, that means unlimited. Users can configure this value 
based on their use cases. " +
             "To avoid any data loss, this value should be more than the 
maximum retention period of any topic enabled with " +
             "tiered storage in the cluster.";
+    public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC = "The 
minimum number of replicas that must acknowledge a write to remote log metadata 
topic.";
     public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The 
amount of time in milliseconds to wait for the local consumer to " +
             "receive the published event.";
     public static final String 
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval 
in milliseconds for " +
@@ -90,6 +93,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
                       REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
               .define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW,
                       REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
+              .define(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, SHORT, 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, atLeast(1), LOW,
+                      REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC)
               .define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, 
DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
                       REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
               
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
@@ -106,6 +111,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
     private final long consumeWaitMs;
     private final long metadataTopicRetentionMs;
     private final short metadataTopicReplicationFactor;
+    private final short metadataTopicMinIsr;
     private final long initializationRetryMaxTimeoutMs;
     private final long initializationRetryIntervalMs;
 
@@ -126,6 +132,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
         if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
             throw new IllegalArgumentException("Invalid metadata topic 
retention in millis: " + metadataTopicRetentionMs);
         }
+        metadataTopicMinIsr = (short) 
parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP);
         consumeWaitMs = (long) 
parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
         initializationRetryIntervalMs = (long) 
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
         initializationRetryMaxTimeoutMs = (long) 
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
@@ -184,6 +191,10 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
         return initializationRetryIntervalMs;
     }
 
+    public short metadataTopicMinIsr() {
+        return metadataTopicMinIsr;
+    }
+
     public String logDir() {
         return logDir;
     }
@@ -229,6 +240,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
                 ", consumeWaitMs=" + consumeWaitMs +
                 ", metadataTopicRetentionMs=" + metadataTopicRetentionMs +
                 ", metadataTopicReplicationFactor=" + 
metadataTopicReplicationFactor +
+                ", metadataTopicMinIsr=" + metadataTopicMinIsr +
                 ", initializationRetryMaxTimeoutMs=" + 
initializationRetryMaxTimeoutMs +
                 ", initializationRetryIntervalMs=" + 
initializationRetryIntervalMs +
                 ", commonProps=" + configMapToRedactedString(commonProps, 
AdminClientConfig.configDef()) +
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index 8a1f44fcea2..fb8812d6b79 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -34,7 +34,7 @@ import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 public class RemoteLogMetadataManagerTestUtils {
-    private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
+    static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
     private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
     private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 
1000L;
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 1874416f683..e3eb05b72b9 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -27,14 +27,17 @@ import org.junit.jupiter.api.Test;
 
 import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
@@ -140,6 +143,10 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         assertTrue(configString.contains("enable.auto.commit=false"));
     }
 
+    private Map<String, Object> createValidConfigProps() {
+        return this.createValidConfigProps(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap());
+    }
+
     private Map<String, Object> createValidConfigProps(Map<String, Object> 
commonClientConfig,
                                                        Map<String, Object> 
producerConfig,
                                                        Map<String, Object> 
consumerConfig) {
@@ -192,4 +199,20 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         Arrays.stream(sensitiveConfigKeys)
                 .forEach(config -> assertTrue(configString.contains(config + 
"=(redacted)")));
     }
+
+    @Test
+    public void testDefaultMinIsr() {
+        Map<String, Object> props = createValidConfigProps();
+        TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
+        assertEquals(DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, 
rlmmConfig.metadataTopicMinIsr());
+    }
+
+    @Test
+    public void testCustomMinIsr() {
+        Map<String, Object> props = createValidConfigProps();
+        short customMinIsr = 3;
+        props.put(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, customMinIsr);
+        TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
+        assertEquals(customMinIsr, rlmmConfig.metadataTopicMinIsr());
+    }
 }
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index f564028ccaf..3ddee1b24f8 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
@@ -24,6 +27,8 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -49,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -370,4 +376,54 @@ public class TopicBasedRemoteLogMetadataManagerTest {
             Exit.resetExitProcedure();
         }
     }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicWithDefaultMinIsr() throws 
ExecutionException, InterruptedException {
+        // Initialize the manager which will create the __remote_log_metadata 
topic
+        TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager 
= topicBasedRlmm();
+        
verifyRemoteLogMetadataTopicWithMinIsr(topicBasedRemoteLogMetadataManager,
+                                               
TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR,
+                                               "default value");
+    }
+
+    @ClusterTest
+    public void testRemoteLogMetadataTopicWithCustomMinIsr() throws 
ExecutionException, InterruptedException, IOException {
+        // Create a manager with custom min.isr value
+        short customMinIsr = 3;
+        Map<String, Object> overrideProps = Map.of(
+            
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
 customMinIsr
+        );
+        try (TopicBasedRemoteLogMetadataManager customRlmm = 
RemoteLogMetadataManagerTestUtils.builder()
+                .bootstrapServers(clusterInstance.bootstrapServers())
+                .overrideRemoteLogMetadataManagerProps(overrideProps)
+                .build()) {
+            verifyRemoteLogMetadataTopicWithMinIsr(customRlmm, customMinIsr, 
"custom value");
+        }
+    }
+
+    private void 
verifyRemoteLogMetadataTopicWithMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
+                                                        short expectedMinIsr,
+                                                        String 
valueDescription)
+                                                        throws 
ExecutionException, InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            String metadataTopic = 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+            
+            // Wait for the topic to be created
+            clusterInstance.waitTopicCreation(metadataTopic, 
RemoteLogMetadataManagerTestUtils.METADATA_TOPIC_PARTITIONS_COUNT);
+            
+            // Verify the topic exists
+            assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
+            
+            // Describe the topic configs to verify min.insync.replicas
+            ConfigResource topicResource = new 
ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
+            DescribeConfigsResult describeResult = 
admin.describeConfigs(List.of(topicResource));
+            Config config = describeResult.all().get().get(topicResource);
+
+            assertNotNull(config, "Topic config should not be null");
+            ConfigEntry minIsrEntry = 
config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
+            assertNotNull(minIsrEntry, "min.insync.replicas config should 
exist");
+            assertEquals(String.valueOf(expectedMinIsr), minIsrEntry.value(), 
+                "min.insync.replicas should be " + expectedMinIsr + " (" + 
valueDescription + ")");
+        }
+    }
 }

Reply via email to