kfaraz commented on code in PR #18436:
URL: https://github.com/apache/druid/pull/18436#discussion_r2302821097


##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java:
##########
@@ -77,4 +82,6 @@ default int getMaxColumnsToMerge()
   {
     return IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE;
   }
+
+

Review Comment:
   Nit: extra new lines



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java:
##########
@@ -38,6 +38,11 @@ public interface AppenderatorConfig extends TuningConfig
 
   boolean isSkipBytesInMemoryOverheadCheck();
 
+  default boolean getReleaseLocksOnHandoff()

Review Comment:
   ```suggestion
     default boolean isReleaseLocksOnHandoff()
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
     );
   }
 
+  private void unlockIntervalIfApplicable(Sink abandonedSink)
+  {
+    Interval abandonedInterval = abandonedSink.getInterval();
+    boolean isIntervalActive = sinks.entrySet().stream()
+                                    .anyMatch(entry -> {
+                                      Sink sink = entry.getValue();
+                                      return !Objects.equals(sink, 
abandonedSink)
+                                             && sink.isWritable()
+                                             && 
sink.getInterval().overlaps(abandonedInterval);
+                                    });
+    if (!isIntervalActive) {
+      taskIntervalUnlocker.releaseLock(abandonedInterval);
+    }
+    log.info("implement this.");

Review Comment:
   ```suggestion
       if (isIntervalActive) {
         log.info("Interval[%s] is still being appended to by sink[%s]", 
abandonedInterval, sinkId);
       } else {
          log.info("Unlocking interval[%s] as there are no more active sinks 
for it.", abandonedInterval);
          taskIntervalUnlocker.releaseLock(abandonedInterval);
       }
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
     );
   }
 
+  private void unlockIntervalIfApplicable(Sink abandonedSink)
+  {
+    Interval abandonedInterval = abandonedSink.getInterval();
+    boolean isIntervalActive = sinks.entrySet().stream()
+                                    .anyMatch(entry -> {
+                                      Sink sink = entry.getValue();
+                                      return !Objects.equals(sink, 
abandonedSink)
+                                             && sink.isWritable()

Review Comment:
   What do these two conditions signify?
   Please add a javadoc to this method briefly calling out the cases when the 
interval would be unlocked.



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception

Review Comment:
   ```suggestion
     public void test_dropSegment_unlocksInterval() throws Exception
   ```



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+  {
+    final List<Interval> unlockedIntervals = new ArrayList<>();
+    final TaskIntervalUnlocker mockUnlocker = interval -> {
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.add(interval);
+      }
+    };
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(mockUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec identifier1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+      final SegmentIdWithShardSpec identifier2 = 
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+      final InputRow row1 = new MapBasedInputRow(
+          DateTimes.of("2000"),
+          ImmutableList.of("dim1"),
+          ImmutableMap.of("dim1", "bar", "met1", 1)
+      );
+
+      final InputRow row2 = new MapBasedInputRow(
+          DateTimes.of("2000-01-01T02:30"),
+          ImmutableList.of("dim1"),
+          ImmutableMap.of("dim1", "baz", "met1", 1)
+      );
+
+      appenderator.add(identifier1, row1, 
Suppliers.ofInstance(Committers.nil()), false);
+      appenderator.add(identifier2, row2, 
Suppliers.ofInstance(Committers.nil()), false);
+
+      Assert.assertEquals(2, appenderator.getSegments().size());
+
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.clear();
+      }
+
+      appenderator.drop(identifier1).get();
+
+      synchronized (unlockedIntervals) {
+        Assert.assertEquals(1, unlockedIntervals.size());
+        Assert.assertEquals(identifier1.getInterval(), 
unlockedIntervals.get(0));
+      }
+
+      Assert.assertEquals(1, appenderator.getSegments().size());
+      Assert.assertTrue(appenderator.getSegments().contains(identifier2));
+    }
+  }
+
+  @Test
+  public void test_abandonSegment_shouldNotUnlockInterval() throws Exception

Review Comment:
   ```suggestion
     public void 
test_dropSegment_skipsUnlockInterval_ifOverlappingSinkIsActive() throws 
Exception
   ```



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java:
##########
@@ -56,28 +59,33 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
   private final EmbeddedBroker broker = new EmbeddedBroker();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer();
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
   private KafkaResource kafkaServer;
 
   @Override
   public EmbeddedDruidCluster createCluster()
   {
     final EmbeddedDruidCluster cluster = 
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
 
     kafkaServer = new KafkaResource();
 
     cluster.addExtension(KafkaIndexTaskModule.class)
+           .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")

Review Comment:
   Why is this needed?



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java:
##########
@@ -56,28 +59,33 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
   private final EmbeddedBroker broker = new EmbeddedBroker();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer();
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
   private KafkaResource kafkaServer;
 
   @Override
   public EmbeddedDruidCluster createCluster()
   {
     final EmbeddedDruidCluster cluster = 
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");

Review Comment:
   Overlord does not seem to be polling segments in this test. We can remove 
this.



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java:
##########
@@ -174,7 +174,8 @@ public BatchAppenderatorTester(
         0L,
         OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
-        basePersistDirectory == null ? createNewBasePersistDirectory() : 
basePersistDirectory
+        basePersistDirectory == null ? createNewBasePersistDirectory() : 
basePersistDirectory,
+        false

Review Comment:
   Pass null for default value.



##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java:
##########
@@ -374,6 +375,17 @@ public SupervisorStatus getSupervisorStatus(String 
supervisorId)
     throw new ISE("Could not find supervisor[%s]", supervisorId);
   }
 
+  /**
+   * Fetches the currently locked intervals by tasks.
+   * 
+   * @param lockFilterPolicies List of filters for different datasources.
+   * @return Map from datasource name to list of intervals locked by tasks.
+   */
+  public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy> 
lockFilterPolicies)

Review Comment:
   Nit: I don't think this warrants a new method. This can be inlined in the 
one place it is currently being used.



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java:
##########
@@ -2911,7 +2911,8 @@ private KafkaIndexTask createTask(
         maxParseExceptions,
         maxSavedParseExceptions,
         null,
-        null
+        null,
+        false

Review Comment:
   Nit: Pass `null` instead so that we fallback to the default, even if it 
changes in the future.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java:
##########
@@ -293,6 +298,12 @@ public int getMaxColumnsToMerge()
     return maxColumnsToMerge;
   }
 
+  @JsonProperty
+  public boolean getReleaseLockOnHandoff()

Review Comment:
   Nit: Primitive boolean getters are named `isXyz()` by convention.
   Boxed `Boolean` getters may be named `getXyz()`.
   
   ```suggestion
     public boolean isReleaseLockOnHandoff()
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+/**
+ * This interface provides a callback mechanism to interact with TaskLockbox 
for releasing interval locks when
+ * the segments are handed off. We need this interface to avoid cyclic 
dependencues because the
+ * {@code TaskLockbox} is in druid-indexing-service module
+ */
+@FunctionalInterface
+public interface TaskIntervalUnlocker
+{
+  /**
+   * Releases the lock for the given interval.

Review Comment:
   We should check the implementation on the Overlord side and call it out here 
on whether this releases locks for this exact interval or any lock held by this 
Task that _overlaps_ this interval.



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java:
##########
@@ -119,6 +127,13 @@ public void test_runKafkaSupervisor()
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
     supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId);
     Assertions.assertTrue(supervisorStatus.isSuspended());
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/handoff/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
List.of(dataSource)),
+        agg -> agg.hasSumAtLeast(10)
+    );
+    Thread.sleep(5000);

Review Comment:
   ```suggestion
       overlord.latchableEmitter().waitForEventAggregate(
           event -> event.hasMetricName("task/action/run/time")
                         .hasDimension(DruidMetrics.DATASOURCE, dataSource)
                         .hasDimension(DruidMetrics.TASK_ACTION_TYPE, 
"lockRelease")
           agg -> agg.hasCountAtLeast(10)
       );
   ```



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+  {
+    final List<Interval> unlockedIntervals = new ArrayList<>();
+    final TaskIntervalUnlocker mockUnlocker = interval -> {
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.add(interval);
+      }
+    };
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(mockUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec identifier1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+      final SegmentIdWithShardSpec identifier2 = 
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+      final InputRow row1 = new MapBasedInputRow(
+          DateTimes.of("2000"),
+          ImmutableList.of("dim1"),
+          ImmutableMap.of("dim1", "bar", "met1", 1)
+      );
+
+      final InputRow row2 = new MapBasedInputRow(
+          DateTimes.of("2000-01-01T02:30"),
+          ImmutableList.of("dim1"),
+          ImmutableMap.of("dim1", "baz", "met1", 1)
+      );
+
+      appenderator.add(identifier1, row1, 
Suppliers.ofInstance(Committers.nil()), false);
+      appenderator.add(identifier2, row2, 
Suppliers.ofInstance(Committers.nil()), false);
+
+      Assert.assertEquals(2, appenderator.getSegments().size());
+
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.clear();
+      }

Review Comment:
   Isn't this a fresh list? Why do we need to clear it?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java:
##########
@@ -38,6 +38,11 @@ public interface AppenderatorConfig extends TuningConfig
 
   boolean isSkipBytesInMemoryOverheadCheck();
 
+  default boolean getReleaseLocksOnHandoff()

Review Comment:
   Please add a short javadoc.



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+  {
+    final List<Interval> unlockedIntervals = new ArrayList<>();
+    final TaskIntervalUnlocker mockUnlocker = interval -> {

Review Comment:
   ```suggestion
       final TaskIntervalUnlocker intervalUnlocker = interval -> {
   ```



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+  {
+    final List<Interval> unlockedIntervals = new ArrayList<>();
+    final TaskIntervalUnlocker mockUnlocker = interval -> {
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.add(interval);
+      }
+    };
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(mockUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec identifier1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+      final SegmentIdWithShardSpec identifier2 = 
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);

Review Comment:
   ```suggestion
         final SegmentIdWithShardSpec segmentId2 = 
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
   ```



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+  {
+    final List<Interval> unlockedIntervals = new ArrayList<>();
+    final TaskIntervalUnlocker mockUnlocker = interval -> {
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.add(interval);
+      }
+    };
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(mockUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec identifier1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);

Review Comment:
   ```suggestion
         final SegmentIdWithShardSpec segmentId1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
   ```



##########
server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java:
##########
@@ -2449,6 +2450,118 @@ public void testSchemaAnnouncement() throws Exception
     }
   }
 
+
+  @Test
+  public void test_abandonSegment_unlockIntervalWithOverlap() throws Exception
+  {
+    final List<Interval> unlockedIntervals = new ArrayList<>();
+    final TaskIntervalUnlocker mockUnlocker = interval -> {
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.add(interval);
+      }
+    };
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(mockUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec identifier1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+      final SegmentIdWithShardSpec identifier2 = 
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+      final InputRow row1 = new MapBasedInputRow(
+          DateTimes.of("2000"),
+          ImmutableList.of("dim1"),
+          ImmutableMap.of("dim1", "bar", "met1", 1)
+      );
+
+      final InputRow row2 = new MapBasedInputRow(
+          DateTimes.of("2000-01-01T02:30"),
+          ImmutableList.of("dim1"),
+          ImmutableMap.of("dim1", "baz", "met1", 1)
+      );
+
+      appenderator.add(identifier1, row1, 
Suppliers.ofInstance(Committers.nil()), false);
+      appenderator.add(identifier2, row2, 
Suppliers.ofInstance(Committers.nil()), false);
+
+      Assert.assertEquals(2, appenderator.getSegments().size());
+
+      synchronized (unlockedIntervals) {
+        unlockedIntervals.clear();
+      }
+
+      appenderator.drop(identifier1).get();
+
+      synchronized (unlockedIntervals) {

Review Comment:
   You can avoid the `synchronized` by using a thread-safe collection like some 
concurrent map or set.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+/**
+ * This interface provides a callback mechanism to interact with TaskLockbox 
for releasing interval locks when
+ * the segments are handed off. We need this interface to avoid cyclic 
dependencues because the
+ * {@code TaskLockbox} is in druid-indexing-service module
+ */
+@FunctionalInterface
+public interface TaskIntervalUnlocker
+{
+  /**
+   * Releases the lock for the given interval.
+   *
+   * @param interval interval for which the lock needs to be released

Review Comment:
   We can omit this as it doesn't add any new info.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
     );
   }
 
+  private void unlockIntervalIfApplicable(Sink abandonedSink)
+  {
+    Interval abandonedInterval = abandonedSink.getInterval();
+    boolean isIntervalActive = sinks.entrySet().stream()
+                                    .anyMatch(entry -> {
+                                      Sink sink = entry.getValue();
+                                      return !Objects.equals(sink, 
abandonedSink)
+                                             && sink.isWritable()
+                                             && 
sink.getInterval().overlaps(abandonedInterval);
+                                    });
+    if (!isIntervalActive) {
+      taskIntervalUnlocker.releaseLock(abandonedInterval);
+    }
+    log.info("implement this.");

Review Comment:
   Also, given that this is a new feature, I am not sure if we should throw an 
exception if the unlock fails.
   Can you please check if throwing an exception here will cause the task 
itself to fail or any other side effect?
   
   Let's do the following instead:
   - Add `throws IOException` to `TaskIntervalLocker.releaseLock()`
   - Put the actual call to `taskIntervalUnlocker.releaseLock()` inside 
try/catch
   - Emit an alert if unlock fails
   
   In a follow up PR:
   Let's also discuss the possibility of some clean up mechanism that gets rid 
of orphan locks.
   Maybe the Overlord could have some kind of duty to perform automated clean 
up of orphan locks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to