hlteoh37 commented on code in PR #145:
URL: 
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1668396430


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisShardSplitWithAssignmentStatus.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.flink.connector.kinesis.source.enumerator;

Review Comment:
   missing license



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisShardSplitWithAssignmentStatus.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import java.util.Objects;
+
+/** Kinesis shard split with assignment status. */
+@Internal
+public class KinesisShardSplitWithAssignmentStatus {
+    private final KinesisShardSplit kinesisShardSplit;
+    private final SplitAssignmentStatus splitAssignmentStatus;
+
+    public KinesisShardSplitWithAssignmentStatus(
+            KinesisShardSplit kinesisShardSplit, SplitAssignmentStatus 
splitAssignmentStatus) {
+        this.kinesisShardSplit = kinesisShardSplit;
+        this.splitAssignmentStatus = splitAssignmentStatus;
+    }
+
+    public KinesisShardSplit split() {
+        return kinesisShardSplit;
+    }
+
+    public SplitAssignmentStatus assignmentStatus() {
+        return splitAssignmentStatus;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   nit: Do we want `toString` method too?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/SplitAssignmentStatus.java:
##########
@@ -0,0 +1,32 @@
+package org.apache.flink.connector.kinesis.source.enumerator;

Review Comment:
   missing license header!



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorState.java:
##########
@@ -19,32 +19,31 @@
 package org.apache.flink.connector.kinesis.source.enumerator;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 
 import javax.annotation.Nullable;
 
-import java.util.Set;
+import java.util.List;
 
 /**
  * State for the {@link KinesisStreamsSourceEnumerator}. This class is stored 
in state, so any
  * changes need to be backwards compatible
  */
 @Internal
 public class KinesisStreamsSourceEnumeratorState {
-    private final Set<KinesisShardSplit> unassignedSplits;
+    private final List<KinesisShardSplitWithAssignmentStatus> splits;
     @Nullable private final String lastSeenShardId;
 
     public KinesisStreamsSourceEnumeratorState(
-            Set<KinesisShardSplit> unassignedSplits, String lastSeenShardId) {
-        this.unassignedSplits = unassignedSplits;
+            List<KinesisShardSplitWithAssignmentStatus> splits, String 
lastSeenShardId) {

Review Comment:
   nit: can we say something like `trackedSplits` or `discoveredSplits`?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializer.java:
##########
@@ -109,13 +110,22 @@ public KinesisStreamsSourceEnumeratorState deserialize(
                                 + ". Serializer version is "
                                 + splitSerializer.getVersion());
             }
-            Set<KinesisShardSplit> unassignedSplits = new 
HashSet<>(numUnassignedSplits);
+            List<KinesisShardSplitWithAssignmentStatus> unassignedSplits =
+                    new ArrayList<>(numUnassignedSplits);
             for (int i = 0; i < numUnassignedSplits; i++) {
                 int serializedLength = in.readInt();
                 byte[] serializedSplit = new byte[serializedLength];
                 if (in.read(serializedSplit) != -1) {
+                    KinesisShardSplit deserializedSplit =
+                            
splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
+                    SplitAssignmentStatus assignmentStatus = 
SplitAssignmentStatus.UNASSIGNED;
+                    if (version == CURRENT_VERSION) {
+                        assignmentStatus = 
SplitAssignmentStatus.fromStatusCode(in.readInt());
+                    }

Review Comment:
   for previous serializer version, we should be safe to assume unassigned 
split right?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTrackerTest.java:
##########
@@ -0,0 +1,353 @@
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;

Review Comment:
   missing license header



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/** This class is used to track shard hierarchy. */
+@Internal
+public class SplitTracker {
+    /**
+     * Flag controlling if tracker should wait before all parent splits will 
be completed before
+     * assigning split to readers.
+     */
+    private final boolean preserveShardOrdering;
+
+    /** Map of all discovered splits that have not been completed. */
+    private final Map<String, KinesisShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+
+    /** Set of currently assigned split id. */

Review Comment:
   nit: currently assigned splits



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;

Review Comment:
   missing license header



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -211,32 +301,33 @@ private void assignSplits(List<KinesisShardSplit> 
discoveredSplits, Throwable th
             throw new KinesisStreamsSourceException("Failed to list shards.", 
throwable);
         }
 
+        splitTracker.addSplits(discoveredSplits);
+        updateLastSeenShardId(discoveredSplits);
+
         if (context.registeredReaders().size() < context.currentParallelism()) 
{
             LOG.info(
                     "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, Registered readers: {}",
                     context.currentParallelism(),
                     context.registeredReaders().size());
-            unassignedSplits.addAll(discoveredSplits);
             return;
         }
 
+        assignSplits();
+    }
+
+    private void assignSplits() {
         Map<Integer, List<KinesisShardSplit>> newSplitAssignments = new 
HashMap<>();
-        for (KinesisShardSplit split : unassignedSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-        unassignedSplits.clear();
-        for (KinesisShardSplit split : discoveredSplits) {
+        for (KinesisShardSplit split : 
splitTracker.splitsAvailableForAssignment()) {
             assignSplitToSubtask(split, newSplitAssignments);
         }

Review Comment:
   Noted that this is ok to be non-thread safe because it is done in the 
enumerator main thread.
   
   The interaction between source enumerator + split tracker seems like it has 
potential to have misses, as we now have 2x components. Can we add some tests 
to validate the splitTracker is called correctly?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/** This class is used to track shard hierarchy. */
+@Internal
+public class SplitTracker {
+    /**
+     * Flag controlling if tracker should wait before all parent splits will 
be completed before
+     * assigning split to readers.
+     */
+    private final boolean preserveShardOrdering;
+
+    /** Map of all discovered splits that have not been completed. */
+    private final Map<String, KinesisShardSplit> knownSplits = new 
ConcurrentHashMap<>();

Review Comment:
   Can we add details of what the key is in the doc?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/** This class is used to track shard hierarchy. */
+@Internal
+public class SplitTracker {
+    /**
+     * Flag controlling if tracker should wait before all parent splits will 
be completed before
+     * assigning split to readers.
+     */
+    private final boolean preserveShardOrdering;
+
+    /** Map of all discovered splits that have not been completed. */
+    private final Map<String, KinesisShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+
+    /** Set of currently assigned split id. */
+    private final Set<String> assignedSplits = new HashSet<>();
+
+    public SplitTracker(boolean preserveShardOrdering) {
+        this(preserveShardOrdering, Collections.emptyList());
+    }
+
+    public SplitTracker(
+            boolean preserveShardOrdering,
+            List<KinesisShardSplitWithAssignmentStatus> initialState) {
+        this.preserveShardOrdering = preserveShardOrdering;
+
+        initialState.forEach(
+                splitWithStatus -> {
+                    knownSplits.put(splitWithStatus.split().splitId(), 
splitWithStatus.split());
+                    if 
(SplitAssignmentStatus.ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param splitsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<KinesisShardSplit> splitsToAdd) {
+        splitsToAdd.forEach(split -> knownSplits.put(split.splitId(), split));
+    }
+
+    /**
+     * Mark splits as assigned. Assigned splits will no longer be returned as 
pending splits.
+     *
+     * @param splitsToAssign collection of splits to mark as assigned
+     */
+    public void markAsAssigned(Collection<KinesisShardSplit> splitsToAssign) {
+        splitsToAssign.forEach(split -> assignedSplits.add(split.splitId()));
+    }
+
+    /**
+     * Mark splits with specified ids as finished.
+     *
+     * @param finishedSplitIds collection of split ids to mark as finished
+     */
+    public void markAsFinished(Collection<String> finishedSplitIds) {
+        finishedSplitIds.forEach(
+                splitId -> {
+                    assignedSplits.remove(splitId);
+                    knownSplits.remove(splitId);
+                });
+    }
+
+    /**
+     * Checks if split with specified id had been assigned to the reader.
+     *
+     * @param splitId split id
+     * @return {@code true} if split had been assigned, otherwise {@code false}
+     */
+    public boolean isAssigned(String splitId) {
+        return assignedSplits.contains(splitId);
+    }
+
+    /**
+     * Returns list of splits that can be assigned to readers. Does not 
include splits that are
+     * already assigned or finished. If shard ordering is enabled, only splits 
with finished parents
+     * will be returned.
+     *
+     * @return list of splits that can be assigned to readers.
+     */
+    public List<KinesisShardSplit> splitsAvailableForAssignment() {
+        return knownSplits.values().stream()
+                .filter(
+                        split -> {
+                            boolean splitIsNotAssigned = 
!isAssigned(split.splitId());
+                            if (preserveShardOrdering) {
+                                // Check if all parent splits were finished
+                                return splitIsNotAssigned
+                                        && 
verifyAllParentSplitsAreFinished(split);
+                            } else {
+                                return splitIsNotAssigned;
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Prepare split graph representation to store in state. Method returns 
only splits that are
+     * currently assigned to readers or unassigned. Finished splits are not 
included in the result.
+     *
+     * @param checkpointId id of the checkpoint
+     * @return list of splits with current assignment status
+     */
+    public List<KinesisShardSplitWithAssignmentStatus> snapshotState(long 
checkpointId) {
+        return knownSplits.values().stream()
+                .map(
+                        split -> {
+                            SplitAssignmentStatus assignmentStatus =
+                                    isAssigned(split.splitId())
+                                            ? SplitAssignmentStatus.ASSIGNED
+                                            : SplitAssignmentStatus.UNASSIGNED;
+                            return new KinesisShardSplitWithAssignmentStatus(
+                                    split, assignmentStatus);
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private boolean verifyAllParentSplitsAreFinished(KinesisShardSplit split) {
+        boolean allParentsFinished = true;
+        for (String parentSplitId : split.getParentShardIds()) {
+            allParentsFinished = allParentsFinished && 
isFinished(parentSplitId);
+        }
+
+        return allParentsFinished;
+    }

Review Comment:
   Might be good to leave a comment/tests to validate the assumptions we have 
made here, that this logic only works because parent splits are always by 
definition existing in the set of known splits. 



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/SplitAssignmentStatus.java:
##########
@@ -0,0 +1,32 @@
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Assignment status of {@link 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplit}.
+ */
+@Internal
+public enum SplitAssignmentStatus {
+    ASSIGNED(0),
+    UNASSIGNED(1);
+
+    private final int statusCode;
+
+    SplitAssignmentStatus(int statusCode) {
+        this.statusCode = statusCode;
+    }
+
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public static SplitAssignmentStatus fromStatusCode(int statusCode) {
+        for (SplitAssignmentStatus status : SplitAssignmentStatus.values()) {
+            if (status.getStatusCode() == statusCode) {
+                return status;
+            }
+        }
+
+        throw new IllegalArgumentException("Unknown status code: " + 
statusCode);
+    }

Review Comment:
   Noted that we have set up the serialization arrangement for enums correctly 
so that we can add more enums safely in the future :) 



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.connector.kinesis.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.kinesis.source.enumerator.SplitAssignmentStatus;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/** This class is used to track shard hierarchy. */
+@Internal
+public class SplitTracker {
+    /**
+     * Flag controlling if tracker should wait before all parent splits will 
be completed before
+     * assigning split to readers.
+     */
+    private final boolean preserveShardOrdering;
+
+    /** Map of all discovered splits that have not been completed. */
+    private final Map<String, KinesisShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+
+    /** Set of currently assigned split id. */
+    private final Set<String> assignedSplits = new HashSet<>();
+
+    public SplitTracker(boolean preserveShardOrdering) {
+        this(preserveShardOrdering, Collections.emptyList());
+    }
+
+    public SplitTracker(
+            boolean preserveShardOrdering,
+            List<KinesisShardSplitWithAssignmentStatus> initialState) {
+        this.preserveShardOrdering = preserveShardOrdering;
+
+        initialState.forEach(
+                splitWithStatus -> {
+                    knownSplits.put(splitWithStatus.split().splitId(), 
splitWithStatus.split());
+                    if 
(SplitAssignmentStatus.ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param splitsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<KinesisShardSplit> splitsToAdd) {
+        splitsToAdd.forEach(split -> knownSplits.put(split.splitId(), split));
+    }
+
+    /**
+     * Mark splits as assigned. Assigned splits will no longer be returned as 
pending splits.
+     *
+     * @param splitsToAssign collection of splits to mark as assigned
+     */
+    public void markAsAssigned(Collection<KinesisShardSplit> splitsToAssign) {
+        splitsToAssign.forEach(split -> assignedSplits.add(split.splitId()));
+    }
+
+    /**
+     * Mark splits with specified ids as finished.
+     *
+     * @param finishedSplitIds collection of split ids to mark as finished
+     */
+    public void markAsFinished(Collection<String> finishedSplitIds) {
+        finishedSplitIds.forEach(
+                splitId -> {
+                    assignedSplits.remove(splitId);
+                    knownSplits.remove(splitId);
+                });
+    }
+
+    /**
+     * Checks if split with specified id had been assigned to the reader.
+     *
+     * @param splitId split id
+     * @return {@code true} if split had been assigned, otherwise {@code false}
+     */
+    public boolean isAssigned(String splitId) {
+        return assignedSplits.contains(splitId);
+    }
+
+    /**
+     * Returns list of splits that can be assigned to readers. Does not 
include splits that are
+     * already assigned or finished. If shard ordering is enabled, only splits 
with finished parents
+     * will be returned.
+     *
+     * @return list of splits that can be assigned to readers.
+     */
+    public List<KinesisShardSplit> splitsAvailableForAssignment() {
+        return knownSplits.values().stream()
+                .filter(
+                        split -> {
+                            boolean splitIsNotAssigned = 
!isAssigned(split.splitId());
+                            if (preserveShardOrdering) {
+                                // Check if all parent splits were finished
+                                return splitIsNotAssigned
+                                        && 
verifyAllParentSplitsAreFinished(split);
+                            } else {
+                                return splitIsNotAssigned;
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Prepare split graph representation to store in state. Method returns 
only splits that are
+     * currently assigned to readers or unassigned. Finished splits are not 
included in the result.
+     *
+     * @param checkpointId id of the checkpoint
+     * @return list of splits with current assignment status
+     */
+    public List<KinesisShardSplitWithAssignmentStatus> snapshotState(long 
checkpointId) {
+        return knownSplits.values().stream()
+                .map(
+                        split -> {
+                            SplitAssignmentStatus assignmentStatus =
+                                    isAssigned(split.splitId())
+                                            ? SplitAssignmentStatus.ASSIGNED
+                                            : SplitAssignmentStatus.UNASSIGNED;
+                            return new KinesisShardSplitWithAssignmentStatus(
+                                    split, assignmentStatus);
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private boolean verifyAllParentSplitsAreFinished(KinesisShardSplit split) {

Review Comment:
   Noted that this implementation also covers the edge case, when we restore 
from a stale snapshot that is taken when not consuming at the tip of stream. 
Let's make sure we have a test case for this!
   
   ### Edge case 1
   The edge case I'm thinking of:
   ```
   A ---\   /--- E
          C 
   B ---/   \--- F
   ```
   Above I show a shard that was merged, then split. Although this is 
unrealistic in a 2 shard system, it represents a situation that can easily 
occur in large shard systems.
   
   First we imagine a snapshot is taken when stream is aware of A,B,C, but 
currently consuming from A+B.
   ```
   Current stream condition:
   A ---\   
         C 
   B ---/   
   
   Enumerator state:
   knownSplits: A,B,C
   assignedSplits: A,B
   ```
   
   Ok imagine now we keep this snapshot until the stream has expired shards A + 
B + C
   
   ```
   Current stream condition:
     /--- E
   C (expired)
     \--- F
   ```
   
   Now, what will happen?
   1. We discover splits E + F.
   ```
   Enumerator state:
   knownSplits: C,E (waiting on C), F (waiting on C)
   assignedSplits: A,B
   ```
   2. Assigned splits A + B will be marked as finished because they are 
expired. Assumed behavior of the SplitReader.
   
   ```
   Enumerator state:
   knownSplits: C,E (waiting on C), F (waiting on C)
   assignedSplits: 
   ```
   3. Enumerator will assign split C.
   4. Split C will be marked as finished because they are expired. (Once again 
assumed behavior of SplitReader)
   5. We will then start consuming from E + F.
   
   
   
   ### Edge case 2
   
   ```
   A ---\   /--- E ---\
          C            G
   B ---/   \--- F ---/
   ```
   
   
   First we imagine a snapshot is taken when stream is aware of A,B,C, but 
currently consuming from A+B.
   ```
   Current stream condition:
   A ---\   
         C 
   B ---/   
   
   Enumerator state:
   knownSplits: A,B,C
   assignedSplits: A,B
   ```
   
   Then we keep snapshot until it is stale, and shards A,B,C,E,F have expired.
   
   ```
   Current stream condition:
   E (expired) ---\
                   G
   F (expired) ---/
   
   Enumerator state:
   knownSplits: A,B,C
   assignedSplits: A,B
   ```
   
   Now, what will happen?
   1. We discover splits G.
   ```
   Enumerator state:
   knownSplits: A,B,C,G(waiting on E,F)
   assignedSplits: A,B
   ```
   2. Assigned splits A + B will be marked as finished because they are 
expired. Assumed behavior of the SplitReader.
   
   ```
   Enumerator state:
   knownSplits: C,G(waiting on E,F)
   assignedSplits: 
   ```
   3. Enumerator will assign split C + G
   4. Split C will be marked as finished because they are expired. (Once again 
assumed behavior of SplitReader). We will start consuming from G directly.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to