aokolnychyi commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r808173455
##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +55,41 @@
* @return this for method chaining
*/
ReplacePartitions validateAppendOnly();
+
+ /**
+ * Set the snapshot ID used in validations for this operation.
+ *
+ * All validations will check changes after this snapshot ID. If this is not
called, validation will occur
Review comment:
Is the Javadoc about the default behavior still accurate?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -285,22 +337,18 @@ protected void validateAddedDataFiles(TableMetadata base,
Long startingSnapshotI
ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests,
ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry ->
newSnapshots.contains(entry.snapshotId()))
- .filterData(conflictDetectionFilter)
.specsById(base.specsById())
.ignoreDeleted()
.ignoreExisting();
- try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictGroup.entries().iterator()) {
- if (conflicts.hasNext()) {
- throw new ValidationException("Found conflicting files that can
contain records matching %s: %s",
- conflictDetectionFilter,
- Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
- }
-
- } catch (IOException e) {
- throw new UncheckedIOException(
- String.format("Failed to validate no appends matching %s",
conflictDetectionFilter), e);
+ if (conflictDetectionFilter != null) {
+ conflictGroup = conflictGroup.filterData(conflictDetectionFilter);
}
+ if (partitionSet != null) {
Review comment:
nit: can you add empty lines before and after this if block? It will
separate logically independent blocks.
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -263,6 +265,28 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching given partitions have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a set of partitions to filter new conflicting data
files
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId, PartitionSet partitionSet) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ addedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting files that can
contain records matching partitions %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+ } catch (IOException e) {
Review comment:
nit: let's add an empty line before the catch block to match the
existing code in the method below.
##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +62,38 @@ public ReplacePartitions validateAppendOnly() {
return this;
}
+ @Override
+ public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
+ this.startingSnapshotId = newStartingSnapshotId;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingDeletes() {
+ this.validateNewDeleteFiles = true;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingData() {
+ this.validateNewDataFiles = true;
+ return this;
+ }
+
+ @Override
+ public void validate(TableMetadata currentMetadata) {
+ if (validateNewDataFiles) {
+ if (dataSpec().isUnpartitioned()) {
+ validateAddedDataFiles(currentMetadata, startingSnapshotId,
Expressions.alwaysTrue());
+ } else {
+ validateAddedDataFiles(currentMetadata, startingSnapshotId,
replacedPartitions);
+ }
+ }
+ if (validateNewDeleteFiles) {
Review comment:
Do we also have to check for concurrently deleted data files? I am
wondering about two use cases.
- We overwrite partition `p1` and someone concurrently drops the entire
partition before we commit. That will remove all data files in `p1` without
adding new data or delete files.
- We overwrite partition `p1` and someone concurrently overwrites all files
in `p1` and adds them to `p2` (e.g. UPDATE).
Both cases should fail our operation, right?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the
operation
- * @param dataFilter an expression used to find new conflicting delete files
+ * @param dataFilter an expression used to filter new conflicting delete
files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to filter new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to filter delete files
+ * @param partitionSet a partition set used to filter delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(),
Lists.newArrayList()).specsById(base.specsById()).build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter);
+ DeleteFileIndex.Builder deleteIndexBuilder =
DeleteFileIndex.builderFor(ops.io(), deleteManifests)
+ .afterSequenceNumber(startingSequenceNumber)
+ .caseSensitive(caseSensitive)
+ .specsById(ops.current().specsById());
- ValidationException.check(deletes.isEmpty(),
- "Found new conflicting delete files that can apply to records matching
%s: %s",
- dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ if (partitionSet != null) {
+ deleteIndexBuilder = deleteIndexBuilder.filterPartitions(partitionSet);
+ }
+ if (dataFilter != null) {
Review comment:
nit: add an empty like before the if block
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the
operation
- * @param dataFilter an expression used to find new conflicting delete files
+ * @param dataFilter an expression used to filter new conflicting delete
files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to filter new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to filter delete files
+ * @param partitionSet a partition set used to filter delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(),
Lists.newArrayList()).specsById(base.specsById()).build();
Review comment:
nit: I'd probably split this into multiple lines and potentially use
`ImmutableList.of()` or `Collections.emptyList()`.
```
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
.specsById(base.specsById())
.build();
}
```
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the
operation
- * @param dataFilter an expression used to find new conflicting delete files
+ * @param dataFilter an expression used to filter new conflicting delete
files
Review comment:
nit: is this necessary?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the
operation
- * @param dataFilter an expression used to find new conflicting delete files
+ * @param dataFilter an expression used to filter new conflicting delete
files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to filter new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to filter delete files
+ * @param partitionSet a partition set used to filter delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(),
Lists.newArrayList()).specsById(base.specsById()).build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter);
+ DeleteFileIndex.Builder deleteIndexBuilder =
DeleteFileIndex.builderFor(ops.io(), deleteManifests)
Review comment:
I think I'd prefer to keep `buildDeleteFileIndex` and add `PartitionSet`
to the list of its params. Then you can just call it from this method.
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -272,9 +296,37 @@ private ManifestFile copyManifest(ManifestFile manifest) {
*/
protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
Expression conflictDetectionFilter) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ addedDataFiles(base, startingSnapshotId, conflictDetectionFilter,
null);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting files that can
contain records matching %s: %s",
+ conflictDetectionFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s",
conflictDetectionFilter), e);
+ }
+ }
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the
table since a starting snapshot.
Review comment:
I think there is a bit of inconsistency between the Javadoc and method
name vs the arg and var names. I feel like we should either rename
`conflictDetectionFilter` into `dataFilter`, `conflictGroup` into
`manifestGroup` or rename the method to be something like
`conflictingAddedFiles`.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
##########
@@ -68,4 +68,10 @@ private SparkWriteOptions() {
// Controls whether to take into account the table distribution and sort
order during a write operation
public static final String USE_TABLE_DISTRIBUTION_AND_ORDERING =
"use-table-distribution-and-ordering";
public static final boolean USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT =
true;
+
+ // Identifies snapshot from which to start validating conflicting changes
+ public static final String VALIDATE_FROM_SNAPSHOT_ID =
"validate-from-snapshot";
Review comment:
Should it be `validate-from-snapshot-id`? I guess the name already
matches, just the value needs to be fixed.
##########
File path: core/src/main/java/org/apache/iceberg/util/PartitionSet.java
##########
@@ -183,6 +184,26 @@ public boolean removeAll(Collection<?> objects) {
return changed;
}
+ @Override
+ public String toString() {
Review comment:
Looks correct. Do you have a sample output?
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -236,4 +237,19 @@ public boolean useTableDistributionAndOrdering() {
.defaultValue(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT)
.parse();
}
+
+ public long validateFromSnapshotId() {
+ return confParser.longConf()
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID)
+ .defaultValue(0)
Review comment:
I don't think 0 is a correct default. If we set it to 0, the validation
will throw an exception that it cannot find a snapshot with ID = 0. Should this
use `Long` and `parseOptional`? The place that calls it will have to be adapted
too.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
##########
@@ -68,4 +68,10 @@ private SparkWriteOptions() {
// Controls whether to take into account the table distribution and sort
order during a write operation
public static final String USE_TABLE_DISTRIBUTION_AND_ORDERING =
"use-table-distribution-and-ordering";
public static final boolean USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT =
true;
+
+ // Identifies snapshot from which to start validating conflicting changes
+ public static final String VALIDATE_FROM_SNAPSHOT_ID =
"validate-from-snapshot";
+
+ public static final String DYNAMIC_OVERWRITE_ISOLATION_LEVEL =
"write.dynamic.overwrite.isolation-level";
Review comment:
We try to use short names for read and write options without dots to
indicate namespace as opposed to table props. This option follows the table
property convention.
What about making this property generic? We could call it `isolation-level`
and then respect in DELETE, UPDATE, MERGE operations too (in a separate PR).
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestConflictValidation.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConflictValidation extends SparkCatalogTestBase {
+
+ public TestConflictValidation(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void createTables() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY
(id)", tableName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testDynamicValidation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.select("id", "data").writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset<Row> conflictingDf = spark.createDataFrame(records,
SimpleRecord.class);
+ Exception thrown = null;
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(snapshotId))
+ .option(SparkWriteOptions.DYNAMIC_OVERWRITE_ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ } catch (Exception e) {
+ thrown = e;
+ }
+ Assert.assertNotNull("Expected validation exception but none thrown",
thrown);
+ Assert.assertTrue("Nested validation exception", thrown.getCause()
instanceof ValidationException);
+ Assert.assertTrue(thrown.getCause().getMessage().contains(
+ "Found conflicting files that can contain records matching partitions
[id=1]"));
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ snapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(snapshotId))
+ .option(SparkWriteOptions.DYNAMIC_OVERWRITE_ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ }
+
+ @Test
+ public void testDynamicValidationNoSnapshotId() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.select("id", "data").writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset<Row> conflictingDf = spark.createDataFrame(records,
SimpleRecord.class);
+ Exception thrown = null;
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.DYNAMIC_OVERWRITE_ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwritePartitions();
+ } catch (Exception e) {
+ thrown = e;
+ }
+ Assert.assertNotNull("Expected validation exception but none thrown",
thrown);
+ Assert.assertTrue("Nested validation exception", thrown.getCause()
instanceof ValidationException);
+ Assert.assertTrue(thrown.getCause().getMessage().contains(
+ "Cannot determine history between starting snapshot"));
Review comment:
I think it is related to 0 snapshot ID used as default for
`validate-from-snapshot-id`. I hope if we fix the default value, it will fix
the error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]