nsivabalan commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r810405559



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, 
AppendResult result) {
     statuses.add(this.writeStatus);
   }
 
-  private void processAppendResult(AppendResult result) {
+  /**
+   * Get column statistics for the records part of this append handle.
+   *
+   * @param filePath       - Log file that records are part of
+   * @param recordList     - List of records appended to the log for which 
column statistics is needed for
+   * @param columnRangeMap - Output map to accumulate the column statistics 
for the records
+   */
+  private void getRecordsStats(final String filePath, List<IndexedRecord> 
recordList,
+                               Map<String, 
HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
+    recordList.forEach(record -> accumulateColumnRanges(record, 
writeSchemaWithMetaFields, filePath, columnRangeMap, 
config.isConsistentLogicalTimestampEnabled()));
+  }
+
+  /**
+   * Accumulate column range statistics for the requested record.
+   *
+   * @param record   - Record to get the column range statistics for
+   * @param schema   - Schema for the record
+   * @param filePath - File that record belongs to
+   */
+  private static void accumulateColumnRanges(IndexedRecord record, Schema 
schema, String filePath,
+          Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap, 
boolean consistentLogicalTimestampEnabled) {
+    if (!(record instanceof GenericRecord)) {
+      throw new HoodieIOException("Record is not a generic type to get column 
range metadata!");
+    }
+    schema.getFields().forEach(field -> {
+      final String fieldVal = 
HoodieAvroUtils.getNestedFieldValAsString((GenericRecord) record, field.name(), 
true, consistentLogicalTimestampEnabled);
+      final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+      final HoodieColumnRangeMetadata<Comparable> fieldRange = new 
HoodieColumnRangeMetadata<>(
+              filePath,
+              field.name(),
+              fieldVal,
+              fieldVal,
+              fieldVal == null ? 1 : 0, // null count
+              fieldVal == null ? 0 : 1, // value count
+              fieldSize,
+              fieldSize

Review comment:
       incase of avro, total size and total uncompressed size is gonna be same 
? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +319,67 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
       });
     });
 
-    return engineContext.map(deleteFileList, deleteFileInfo -> {
-      return HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, 
ByteBuffer.allocate(0), true);
-    }, 1).stream().collect(Collectors.toList());
+    HoodieData<Pair<String, String>> deleteFileListRDD = 
engineContext.parallelize(deleteFileList,
+        Math.max(deleteFileList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+    return deleteFileListRDD.map(deleteFileInfo -> 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+        deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, 
StringUtils.EMPTY_STRING,
+        ByteBuffer.allocate(0), true));
   }
 
   /**
    * Convert clean metadata to column stats index records.
    *
-   * @param cleanMetadata     - Clean action metadata
-   * @param engineContext     - Engine context
-   * @param datasetMetaClient - data table meta client
+   * @param cleanMetadata           - Clean action metadata
+   * @param engineContext           - Engine context
+   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
    * @return List of column stats index records for the clean metadata
    */
-  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
-                                                                       
HoodieEngineContext engineContext,
-                                                                       
HoodieTableMetaClient datasetMetaClient) {
+  public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
+                                                                             
HoodieEngineContext engineContext,
+                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
     List<Pair<String, String>> deleteFileList = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
       // Files deleted from a partition
       List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
     });
 
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    return engineContext.flatMap(deleteFileList,
-        deleteFileInfo -> {
-          if 
(deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
 {
-            return getColumnStats(deleteFileInfo.getKey(), 
deleteFileInfo.getValue(), datasetMetaClient,
-                latestColumns, true);
-          }
-          return Stream.empty();
-        }, 1).stream().collect(Collectors.toList());
+    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), 
recordsGenerationParams.isAllColumnStatsIndexEnabled());

Review comment:
       is it possible to move the generation of columnsToIndex to some higher 
layer and avoid repeated computing. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +319,67 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
       });
     });
 
-    return engineContext.map(deleteFileList, deleteFileInfo -> {
-      return HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, 
ByteBuffer.allocate(0), true);
-    }, 1).stream().collect(Collectors.toList());
+    HoodieData<Pair<String, String>> deleteFileListRDD = 
engineContext.parallelize(deleteFileList,
+        Math.max(deleteFileList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));

Review comment:
       again. if min makes sense, do fix in all places. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -831,7 +828,7 @@ public static HoodieTableFileSystemView 
getFileSystemView(HoodieTableMetaClient
    * @param datasetMetaClient                   - Data table meta client
    * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
    */
-  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+  private static List<String> getColumnsToIndex(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {

Review comment:
       a comment about L 834. I feel we can't directly take in 
RecordKeyFieldProp as is. may not work for all key gens. 
   may be we have to split with "," and then set the columns to index. 
   Can you think if there are any other places where we have this dependency 
and check if we have done the right thing

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, 
AppendResult result) {
     statuses.add(this.writeStatus);
   }
 
-  private void processAppendResult(AppendResult result) {
+  /**
+   * Get column statistics for the records part of this append handle.
+   *
+   * @param filePath       - Log file that records are part of
+   * @param recordList     - List of records appended to the log for which 
column statistics is needed for
+   * @param columnRangeMap - Output map to accumulate the column statistics 
for the records
+   */
+  private void getRecordsStats(final String filePath, List<IndexedRecord> 
recordList,

Review comment:
       may be we can name this "setRecordsStats". 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -165,6 +165,12 @@
           + "used for pruning files during the index lookups. Only applies if "
           + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled.A");
 
+  public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_PARALLELISM = 
ConfigProperty
+          .key(METADATA_PREFIX + ".index.column.stats.parallelism")
+          .defaultValue(1)

Review comment:
       why 1. can we make this 10 may be. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -871,27 +879,39 @@ protected void bootstrapCommit(List<DirectoryInfo> 
partitionInfoList, String cre
         return HoodieMetadataPayload.createPartitionFilesRecord(
             partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : 
partitionInfo.getRelativePath(), Option.of(validFileNameToSizeMap), 
Option.empty());
       });
-      partitionRecords = partitionRecords.union(fileListRecords);
+      filesPartitionRecords = filesPartitionRecords.union(fileListRecords);

Review comment:
       I did leave this comment in one of the previous patches. but can we make 
the partition path name deduction to a method and reuse that everywhere. 
   ```
   partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : 
partitionInfo.getRelativePath()
   ```
   we already had some bugs around non partitioned dataset. so wanted to keep 
it in one place. 
   
   also this one
   ```
   String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -187,94 +178,90 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
   /**
    * Convert commit action metadata to bloom filter records.
    *
-   * @param commitMetadata - Commit action metadata
-   * @param dataMetaClient - Meta client for the data table
-   * @param instantTime    - Action instant time
-   * @return List of metadata table records
+   * @param context                 - Engine context to use
+   * @param commitMetadata          - Commit action metadata
+   * @param instantTime             - Action instant time
+   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
+   * @return HoodieData of metadata table records
    */
-  public static List<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
-                                                                       
HoodieTableMetaClient dataMetaClient,
-                                                                       String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
-      final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionStatName;
-      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
-      writeStats.forEach(hoodieWriteStat -> {
-        // No action for delta logs
-        if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
-          return;
-        }
+  public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
+      HoodieEngineContext context, HoodieCommitMetadata commitMetadata,
+      String instantTime, MetadataRecordsGenerationParams 
recordsGenerationParams) {
+    final List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+    if (allWriteStats.isEmpty()) {
+      return context.emptyHoodieData();
+    }
 
-        String pathWithPartition = hoodieWriteStat.getPath();
-        if (pathWithPartition == null) {
-          // Empty partition
-          LOG.error("Failed to find path in write stat to update metadata 
table " + hoodieWriteStat);
-          return;
-        }
-        int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(pathWithPartition.startsWith("/") ? 1 : 0) :
-            partition.length() + 1;
+    HoodieData<HoodieWriteStat> allWriteStatsRDD = 
context.parallelize(allWriteStats,
+        Math.max(recordsGenerationParams.getBloomIndexParallelism(), 
allWriteStats.size()));

Review comment:
       shouldn't we be doing min here instead of max. if there are only 10 
writeStats, why do we parallelize across 100 (if config is set to 100)? 
   applicable to everywhere we do this based on colsStatsParallelism and 
bloomIndexParallelism. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -609,82 +578,124 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
   }
 
   /**
-   * Convert rollback action metadata to bloom filter index records.
+   * Convert added and deleted files metadata to bloom filter index records.
    */
-  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient dataMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
-      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
-        return;
-      }
+  public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams,
+                                                                          
String instantTime) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+    List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+      return deletedFileList.stream().flatMap(deletedFile -> {
+        if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+          return Stream.empty();
+        }
 
-      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-          partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
-    }));
+        final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+        return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+            partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, 
ByteBuffer.allocate(0), true));
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
 
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+    List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getBloomIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getKey();
+      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesEntry.getValue();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      appendedFileMap.forEach((appendedFile, length) -> {
+      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+        final String appendedFile = appendedFileLengthPairEntry.getKey();
         if (!FSUtils.isBaseFile(new Path(appendedFile))) {
-          return;
+          return Stream.empty();
         }
         final String pathWithPartition = partitionName + "/" + appendedFile;
-        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
-        try {
-          HoodieFileReader<IndexedRecord> fileReader =
-              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
appendedFilePath);
+        final Path appendedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+        try (HoodieFileReader<IndexedRecord> fileReader =
+                 
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
 appendedFilePath)) {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
             LOG.error("Failed to read bloom filter for " + appendedFilePath);
-            return;
+            return Stream.empty();
           }
           ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
           HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partition, appendedFile, instantTime, bloomByteBuffer, false);
-          records.add(record);
-          fileReader.close();
+              partition, appendedFile, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
+          return Stream.of(record);
         } catch (IOException e) {
           LOG.error("Failed to get bloom filter for file: " + 
appendedFilePath);
         }
-      });
+        return Stream.empty();
+      }).iterator();
     });
-    return records;
+    allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+
+    return allRecordsRDD;
   }
 
   /**
-   * Convert rollback action metadata to column stats index records.
+   * Convert added and deleted action metadata to column stats index records.
    */
-  private static List<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-                                                                     
HoodieTableMetaClient datasetMetaClient,
-                                                                     
Map<String, List<String>> partitionToDeletedFiles,
-                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                     String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> latestColumns = getLatestColumns(datasetMetaClient);
-    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+  public static HoodieData<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                          
Map<String, List<String>> partitionToDeletedFiles,
+                                                                          
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                          
MetadataRecordsGenerationParams recordsGenerationParams) {
+    HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+    final List<String> columnsToIndex = 
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), 
recordsGenerationParams.isAllColumnStatsIndexEnabled());
+
+    final List<Pair<String, List<String>>> partitionToDeletedFilesList = 
partitionToDeletedFiles.entrySet()
+        .stream().map(e -> Pair.of(e.getKey(), 
e.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD = 
engineContext.parallelize(partitionToDeletedFilesList,
+        Math.max(partitionToDeletedFilesList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()));
+
+    HoodieData<HoodieRecord> deletedFilesRecordsRDD = 
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+      final String partitionName = partitionToDeletedFilesEntry.getLeft();
       final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
-      if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+      final List<String> deletedFileList = 
partitionToDeletedFilesEntry.getRight();
+
+      return deletedFileList.stream().flatMap(deletedFile -> {
         final String filePathWithPartition = partitionName + "/" + deletedFile;
-        records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-            latestColumns, true).collect(Collectors.toList()));
-      }
-    }));
-
-    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> 
appendedFileMap.forEach(
-        (appendedFile, size) -> {
-          final String partition = partitionName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionName;
-          if 
(appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-            final String filePathWithPartition = partitionName + "/" + 
appendedFile;
-            records.addAll(getColumnStats(partition, filePathWithPartition, 
datasetMetaClient,
-                latestColumns, false).collect(Collectors.toList()));
-          }
-        }));
-    return records;
+        return getColumnStats(partition, filePathWithPartition, 
recordsGenerationParams.getDataMetaClient(), columnsToIndex, true);
+      }).iterator();
+    });
+    allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+    final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList = 
partitionToAppendedFiles.entrySet()
+        .stream().map(entry -> Pair.of(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    final HoodieData<Pair<String, Map<String, Long>>> 
partitionToAppendedFilesRDD = 
engineContext.parallelize(partitionToAppendedFilesList,
+        Math.max(partitionToAppendedFiles.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()));
+
+    HoodieData<HoodieRecord> appendedFilesRecordsRDD = 
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+      final String partitionName = partitionToAppendedFilesEntry.getLeft();
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      final Map<String, Long> appendedFileMap = 
partitionToAppendedFilesEntry.getRight();
+
+      return 
appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthPair -> {
+        // TODO: HUDI-3374 Handle log files without delta write stat to get 
records column stats

Review comment:
       can we remove this comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to