codope commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1307430567
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java:
##########
@@ -25,69 +25,74 @@
*/
public enum StorageSchemes {
// Local filesystem
- FILE("file", false, false, true),
+ FILE("file", false, false, true, false),
// Hadoop File System
- HDFS("hdfs", true, false, true),
+ HDFS("hdfs", true, false, true, true),
// Baidu Advanced File System
- AFS("afs", true, null, null),
+ AFS("afs", true, null, null, null),
// Mapr File System
- MAPRFS("maprfs", true, null, null),
+ MAPRFS("maprfs", true, null, null, null),
// Apache Ignite FS
- IGNITE("igfs", true, null, null),
+ IGNITE("igfs", true, null, null, null),
// AWS S3
- S3A("s3a", false, true, null), S3("s3", false, true, null),
+ S3A("s3a", false, true, null, false), S3("s3", false, true, null, false),
// Google Cloud Storage
- GCS("gs", false, true, null),
+ GCS("gs", false, true, null, null),
Review Comment:
Why is GCS null, while S3 is false?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback
{
+ // we will create marker file for each log file and include all log files
to commit message
+ // so that all log files generated in this job will be included in commit
metadata. This is important for MDT.
+ // Assuming if spark task failed, the log file generated by it will not
appear in WriteStatus
+
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ return createAppendMarker(logFileToAppend);
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ // TODO: HUDI-1517 may distinguish log file created from log file being
appended in the future @guanziyue
+ return createAppendMarker(logFileToCreate);
+ }
+
+ private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ return writeMarkers.create(partitionPath, logFileToAppend.getFileName(),
IOType.APPEND,
Review Comment:
`createIfNotExists`?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +129,162 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata fixedCommitMetadata =
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+ // Finalize write
+ finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, fixedCommitMetadata);
+ }
+ // update Metadata table
+ writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+ activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
+
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table,
String commitActionType, String instantTime,
+ HoodieCommitMetadata
commitMetadata) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ HoodieCommitMetadata metadata = commitMetadata;
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) {
+ // get all log files generated by log mark file
+ Set<String> logFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
Review Comment:
this will incur a list call for the marker dir (per deltacommit)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1023,14 +1023,19 @@ public void update(HoodieRollbackMetadata
rollbackMetadata, String instantTime)
validateRollback(commitToRollbackInstantTime, compactionInstant,
deltacommitsSinceCompaction);
- // lets apply a delta commit with DT's rb instant(with special suffix)
containing following records:
+ // lets apply a delta commit with DT's rb instant containing following
records:
// a. any log files as part of RB commit metadata that was added
// b. log files added by the commit in DT being rolled back. By rolled
back, we mean, a rollback block will be added and does not mean it will be
deleted.
// both above list should only be added to FILES partition.
-
- String rollbackInstantTime = createRollbackTimestamp(instantTime);
+ if (deltacommitsSinceCompaction.containsInstant(instantTime)) {
+ LOG.info("Rolling back MDT deltacommit for " + instantTime + ", since
previous attempt failed");
+ if (!getWriteClient().rollback(instantTime)) {
Review Comment:
so a rollback in DT could trigger 2 rollbacks in MDT?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+
+/**
+ * HoodieLogFileWriteCallback is trigger when specific log file operation
happen
+ */
+public interface HoodieLogFileWriteCallback {
+ default boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ return true;
+ }
+
+ default boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return true;
+ }
+
+ default boolean preLogFileClose(HoodieLogFile logFileToClose) {
+ return true;
+ }
+
+ default boolean postLogFileClose(HoodieLogFile logFileToClose) {
Review Comment:
this does not close anything?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Hoo
// stack trace:
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
// related stack overflow post:
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as
GenericData.Array.
List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
- return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantToRollback, serializableRequests, true, parallelism),
- RollbackUtils::mergeRollbackStat, parallelism);
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+
+ // Considering rollback may failed before, which generated some additional
log files. We need to add these log files back.
+ Set<String> logPaths = new HashSet<>();
+ try {
+ logPaths = markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism());
+ } catch (FileNotFoundException fnf) {
+ LOG.warn("Rollback never failed and hence no marker dir was found.
Safely moving on");
+ } catch (IOException e) {
+ throw new HoodieRollbackException("Failed to list log file markers for
previous attempt of rollback ", e);
+ }
+
+ List<Pair<String, HoodieRollbackStat>> getRollbackStats =
maybeDeleteAndCollectStats(context, instantTime, instantToRollback,
serializableRequests, true, parallelism,
+ logPaths);
+ List<HoodieRollbackStat> mergedRollbackStatByPartitionPath =
context.reduceByKey(getRollbackStats, RollbackUtils::mergeRollbackStat,
parallelism);
Review Comment:
will trigger a shuffle
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java:
##########
@@ -25,69 +25,74 @@
*/
public enum StorageSchemes {
// Local filesystem
- FILE("file", false, false, true),
+ FILE("file", false, false, true, false),
// Hadoop File System
- HDFS("hdfs", true, false, true),
+ HDFS("hdfs", true, false, true, true),
// Baidu Advanced File System
- AFS("afs", true, null, null),
+ AFS("afs", true, null, null, null),
// Mapr File System
- MAPRFS("maprfs", true, null, null),
+ MAPRFS("maprfs", true, null, null, null),
// Apache Ignite FS
- IGNITE("igfs", true, null, null),
+ IGNITE("igfs", true, null, null, null),
// AWS S3
- S3A("s3a", false, true, null), S3("s3", false, true, null),
+ S3A("s3a", false, true, null, false), S3("s3", false, true, null, false),
// Google Cloud Storage
- GCS("gs", false, true, null),
+ GCS("gs", false, true, null, null),
// Azure WASB
- WASB("wasb", false, null, null), WASBS("wasbs", false, null, null),
+ WASB("wasb", false, null, null, null), WASBS("wasbs", false, null, null,
null),
// Azure ADLS
- ADL("adl", false, null, null),
+ ADL("adl", false, null, null, null),
// Azure ADLS Gen2
- ABFS("abfs", false, null, null), ABFSS("abfss", false, null, null),
+ ABFS("abfs", false, null, null, null), ABFSS("abfss", false, null, null,
null),
// Aliyun OSS
- OSS("oss", false, null, null),
+ OSS("oss", false, null, null, null),
// View FS for federated setups. If federating across cloud stores, then
append support is false
// View FS support atomic creation
- VIEWFS("viewfs", true, null, true),
+ VIEWFS("viewfs", true, null, true, null),
//ALLUXIO
- ALLUXIO("alluxio", false, null, null),
+ ALLUXIO("alluxio", false, null, null, null),
// Tencent Cloud Object Storage
- COSN("cosn", false, null, null),
+ COSN("cosn", false, null, null, null),
// Tencent Cloud HDFS
- CHDFS("ofs", true, null, null),
+ CHDFS("ofs", true, null, null, null),
// Tencent Cloud CacheFileSystem
- GOOSEFS("gfs", false, null, null),
+ GOOSEFS("gfs", false, null, null, null),
// Databricks file system
- DBFS("dbfs", false, null, null),
+ DBFS("dbfs", false, null, null, null),
// IBM Cloud Object Storage
- COS("cos", false, null, null),
+ COS("cos", false, null, null, null),
// Huawei Cloud Object Storage
- OBS("obs", false, null, null),
+ OBS("obs", false, null, null, null),
// Kingsoft Standard Storage ks3
- KS3("ks3", false, null, null),
+ KS3("ks3", false, null, null, null),
// JuiceFileSystem
- JFS("jfs", true, null, null),
+ JFS("jfs", true, null, null, null),
// Baidu Object Storage
- BOS("bos", false, null, null),
+ BOS("bos", false, null, null, null),
// Oracle Cloud Infrastructure Object Storage
- OCI("oci", false, null, null),
+ OCI("oci", false, null, null, null),
// Volcengine Object Storage
- TOS("tos", false, null, null),
+ TOS("tos", false, null, null, null),
// Volcengine Cloud HDFS
- CFS("cfs", true, null, null);
+ CFS("cfs", true, null, null, null);
private String scheme;
private boolean supportsAppend;
// null for uncertain if write is transactional, please update this for each
FS
private Boolean isWriteTransactional;
// null for uncertain if dfs support atomic create&delete, please update
this for each FS
private Boolean supportAtomicCreation;
+ // list files may bring pressure to storage with centralized meta service
like HDFS.
+ // when we want to get only part of files under a directory rather than all
files, use getStatus may be more friendly than listStatus.
+ // here is a trade-off between rpc times and throughput of storage meta
service
+ private Boolean listStatusUnfriendly;
Review Comment:
do consider cloud costs as well. List calls on cloud object storage is not
cheap.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,27 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback
{
Review Comment:
This can be in `HoodieAppendHandle`.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +129,162 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata fixedCommitMetadata =
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+ // Finalize write
+ finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, fixedCommitMetadata);
+ }
+ // update Metadata table
+ writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+ activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
+
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table,
String commitActionType, String instantTime,
+ HoodieCommitMetadata
commitMetadata) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ HoodieCommitMetadata metadata = commitMetadata;
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) {
+ // get all log files generated by log mark file
+ Set<String> logFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by failed spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about the file slice of
interest when we want to add a new WriteStat.
+ List<Pair<String, Map<String, HoodieWriteStat>>>
partitionToFileIdAndWriteStatList = new ArrayList<>();
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ String partition = partitionAndWriteStats.getKey();
+ Map<String, HoodieWriteStat> fileIdToWriteStat = new HashMap<>();
+ partitionAndWriteStats.getValue().forEach(writeStat -> {
+ String fileId = writeStat.getFileId();
+ if (!fileIdToWriteStat.containsKey(fileId)) {
+ fileIdToWriteStat.put(fileId, writeStat);
+ }
+ });
+ partitionToFileIdAndWriteStatList.add(Pair.of(partition,
fileIdToWriteStat));
+ }
+
+ final Path basePath = new Path(config.getBasePath());
+ Map<String, Map<String, List<String>>>
partitionToFileIdAndMissingLogFiles = new HashMap<>();
+ logFilesMarkerPath
+ .stream()
+ .forEach(logFilePathStr -> {
+ Path logFileFullPath = new Path(config.getBasePath(),
logFilePathStr);
+ String fileID = FSUtils.getFileId(logFileFullPath.getName());
+ String partitionPath =
FSUtils.getRelativePartitionPath(basePath, logFileFullPath.getParent());
+ if
(!partitionToFileIdAndMissingLogFiles.containsKey(partitionPath)) {
+ partitionToFileIdAndMissingLogFiles.put(partitionPath, new
HashMap<>());
+ }
+ if
(!partitionToFileIdAndMissingLogFiles.get(partitionPath).containsKey(fileID)) {
+
partitionToFileIdAndMissingLogFiles.get(partitionPath).put(fileID, new
ArrayList<>());
+ }
+
partitionToFileIdAndMissingLogFiles.get(partitionPath).get(fileID).add(logFilePathStr);
+ });
+
+ context.setJobStatus(this.getClass().getSimpleName(), "generate
writeStat for missing log files");
+
+ // populate partition -> map (fileId -> List <missing log file>)
+ List<Map.Entry<String, Map<String, List<String>>>> missingFilesInfo =
partitionToFileIdAndMissingLogFiles.entrySet().stream().collect(Collectors.toList());
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
context.parallelize(missingFilesInfo).mapToPair(
+ (SerializablePairFunction<Map.Entry<String, Map<String,
List<String>>>, String, Map<String, List<String>>>) t -> Pair.of(t.getKey(),
t.getValue()));
+
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileId to fetch some info about the file slice of
interest.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData =
context.parallelize(partitionToFileIdAndWriteStatList).mapToPair(
+ (SerializablePairFunction<Pair<String, Map<String,
HoodieWriteStat>>, String, Map<String, HoodieWriteStat>>) t -> t);
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+
+ // lets do left outer join to add write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
+ .map((SerializableFunction<Pair<String, Pair<Map<String,
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String,
List<HoodieWriteStat>>>) v1 -> {
+ final Path basePathLocal = new Path(config.getBasePath());
+ String partitionPath = v1.getKey();
+ Map<String, HoodieWriteStat> fileIdToOriginalWriteStat =
v1.getValue().getKey();
+ Map<String, List<String>> missingFileIdToLogFilesList =
v1.getValue().getValue();
+
+ List<HoodieWriteStat> missingWriteStats = new ArrayList();
+ List<String> missingLogFilesForPartition = new ArrayList();
+ missingFileIdToLogFilesList.values().forEach(entry ->
missingLogFilesForPartition.addAll(entry));
+
+ // fetch file sizes for missing log files
+ Path fullPartitionPath = new Path(config.getBasePath(),
partitionPath);
+ FileSystem fileSystem =
fullPartitionPath.getFileSystem(serializableConfiguration.get());
+ List<Option<FileStatus>> fileStatues =
FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath,
missingLogFilesForPartition, true);
Review Comment:
then again list status per missing file right.. can we not change the
`WriteMarkers#getAppendedLogPaths` to return a collection of `FileStatus`
instead of strings.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Hoo
// stack trace:
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
// related stack overflow post:
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as
GenericData.Array.
List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
- return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantToRollback, serializableRequests, true, parallelism),
- RollbackUtils::mergeRollbackStat, parallelism);
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+
+ // Considering rollback may failed before, which generated some additional
log files. We need to add these log files back.
+ Set<String> logPaths = new HashSet<>();
+ try {
+ logPaths = markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism());
Review Comment:
another list operation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]