rdblue commented on a change in pull request #2141:
URL: https://github.com/apache/iceberg/pull/2141#discussion_r563949252
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
!(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
}
- def buildWritePlan(
- childPlan: LogicalPlan,
- table: Table): LogicalPlan = {
- val defaultDistributionMode = table match {
- case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
- TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
- case _ =>
- TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
- }
-
+ private def buildWritePlan(childPlan: LogicalPlan, table: Table):
LogicalPlan = {
table match {
- case iceTable: SparkTable =>
- val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
- val table = iceTable.table()
- val distributionMode: String = table.properties
- .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE,
defaultDistributionMode)
- val order = toCatalyst(toOrderedDistribution(table.spec(),
table.sortOrder(), true), childPlan)
- DistributionMode.fromName(distributionMode) match {
- case DistributionMode.NONE =>
- Sort(buildSortOrder(order), global = false, childPlan)
- case DistributionMode.HASH =>
- val clustering = toCatalyst(toClusteredDistribution(table.spec()),
childPlan)
- val hashPartitioned = RepartitionByExpression(clustering,
childPlan, numShufflePartitions)
- Sort(buildSortOrder(order), global = false, hashPartitioned)
- case DistributionMode.RANGE =>
- val roundRobin = Repartition(numShufflePartitions, shuffle = true,
childPlan)
- Sort(buildSortOrder(order), global = true, roundRobin)
+ case iceberg: SparkTable =>
+ val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
Review comment:
Can we expose the same interface as in Spark 3.2 from `Table` and use
that here instead? Then `Spark3Util` calls remain in `SparkTable`.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -19,49 +19,30 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.iceberg.DistributionMode
-import org.apache.iceberg.TableProperties
-import org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED
-import
org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT
-import org.apache.iceberg.spark.Spark3Util.toClusteredDistribution
-import org.apache.iceberg.spark.Spark3Util.toOrderedDistribution
+import org.apache.iceberg.TableProperties.{MERGE_CARDINALITY_CHECK_ENABLED,
MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT}
+import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.source.SparkTable
import org.apache.iceberg.util.PropertyUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.Ascending
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.IsNull
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.expressions.NullsFirst
-import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.LeftAnti
import org.apache.spark.sql.catalyst.plans.RightOuter
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
-import org.apache.spark.sql.catalyst.plans.logical.DeleteAction
-import org.apache.spark.sql.catalyst.plans.logical.InsertAction
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.catalyst.plans.logical.JoinHint
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.MergeAction
-import org.apache.spark.sql.catalyst.plans.logical.MergeInto
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoParams
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
-import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.catalyst.plans.logical.Repartition
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
-import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
-import org.apache.spark.sql.catalyst.plans.logical.Sort
-import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction,
InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeInto,
MergeIntoParams, MergeIntoTable, Project, Repartition, ReplaceData,
UpdateAction}
Review comment:
Can you revert this change? I'll open a PR to update the scala style
checks.
##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+ private SortOrderUtil() {
+ }
+
+ public static SortOrder buildSortOrder(Table table) {
+ return buildSortOrder(table.spec(), table.sortOrder());
+ }
+
+ public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder
sortOrder) {
+ if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
Review comment:
Should this return `SortOrder.unsorted()` instead?
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
!(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
}
- def buildWritePlan(
- childPlan: LogicalPlan,
- table: Table): LogicalPlan = {
- val defaultDistributionMode = table match {
- case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
- TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
- case _ =>
- TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
- }
-
+ private def buildWritePlan(childPlan: LogicalPlan, table: Table):
LogicalPlan = {
table match {
- case iceTable: SparkTable =>
- val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
- val table = iceTable.table()
- val distributionMode: String = table.properties
- .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE,
defaultDistributionMode)
- val order = toCatalyst(toOrderedDistribution(table.spec(),
table.sortOrder(), true), childPlan)
- DistributionMode.fromName(distributionMode) match {
- case DistributionMode.NONE =>
- Sort(buildSortOrder(order), global = false, childPlan)
- case DistributionMode.HASH =>
- val clustering = toCatalyst(toClusteredDistribution(table.spec()),
childPlan)
- val hashPartitioned = RepartitionByExpression(clustering,
childPlan, numShufflePartitions)
- Sort(buildSortOrder(order), global = false, hashPartitioned)
- case DistributionMode.RANGE =>
- val roundRobin = Repartition(numShufflePartitions, shuffle = true,
childPlan)
- Sort(buildSortOrder(order), global = true, roundRobin)
+ case iceberg: SparkTable =>
+ val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
+ val ordering = Spark3Util.buildRequiredOrdering(distribution,
iceberg.table)
+ val newChildPlan = distribution match {
+ case _: OrderedDistribution =>
+ // insert a round robin partitioning to avoid executing the join
twice
+ val numShufflePartitions = conf.numShufflePartitions
+ Repartition(numShufflePartitions, shuffle = true, childPlan)
Review comment:
This is here and not in `prepareQuery` because we don't want to assume
that a global ordering always requires an extra round-robin repartition?
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
!(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
}
- def buildWritePlan(
- childPlan: LogicalPlan,
- table: Table): LogicalPlan = {
- val defaultDistributionMode = table match {
- case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
- TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
- case _ =>
- TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
- }
-
+ private def buildWritePlan(childPlan: LogicalPlan, table: Table):
LogicalPlan = {
table match {
- case iceTable: SparkTable =>
- val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
- val table = iceTable.table()
- val distributionMode: String = table.properties
- .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE,
defaultDistributionMode)
- val order = toCatalyst(toOrderedDistribution(table.spec(),
table.sortOrder(), true), childPlan)
- DistributionMode.fromName(distributionMode) match {
- case DistributionMode.NONE =>
- Sort(buildSortOrder(order), global = false, childPlan)
- case DistributionMode.HASH =>
- val clustering = toCatalyst(toClusteredDistribution(table.spec()),
childPlan)
- val hashPartitioned = RepartitionByExpression(clustering,
childPlan, numShufflePartitions)
- Sort(buildSortOrder(order), global = false, hashPartitioned)
- case DistributionMode.RANGE =>
- val roundRobin = Repartition(numShufflePartitions, shuffle = true,
childPlan)
- Sort(buildSortOrder(order), global = true, roundRobin)
+ case iceberg: SparkTable =>
+ val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
+ val ordering = Spark3Util.buildRequiredOrdering(distribution,
iceberg.table)
+ val newChildPlan = distribution match {
+ case _: OrderedDistribution =>
+ // insert a round robin partitioning to avoid executing the join
twice
+ val numShufflePartitions = conf.numShufflePartitions
+ Repartition(numShufflePartitions, shuffle = true, childPlan)
Review comment:
This is here and not in `prepareQuery` because we don't want to assume
that a global ordering always requires an extra round-robin repartition? If so,
it would be good to move the comment above `newChildPlan` and make it a bit
more clear why this extra step is happening.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.spark.sql.{catalyst, AnalysisException}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{IcebergBucketTransform,
IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform,
IcebergYearTransform, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
RepartitionByExpression, Sort}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
+import org.apache.spark.sql.connector.expressions.{BucketTransform,
DaysTransform, HoursTransform, MonthsTransform, NamedReference, Transform,
YearsTransform}
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference,
IdentityTransform}
+import
org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution,
Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering,
SortDirection, SortOrder}
+import org.apache.spark.sql.internal.SQLConf
Review comment:
Could you rewrite imports to one per line?
##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName,
int sourceId, String tr
return transforms.toArray(new Transform[0]);
}
- public static Distribution toOrderedDistribution(PartitionSpec spec,
SortOrder sortOrder, boolean inferFromSpec) {
- if (sortOrder.isUnsorted()) {
- if (inferFromSpec) {
- SortOrder specOrder = Partitioning.sortOrderFor(spec);
- return Distributions.ordered(convert(specOrder));
- }
-
- return Distributions.unspecified();
- }
-
- Schema schema = spec.schema();
- Multimap<Integer, SortField> sortFieldIndex =
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
- // build a sort prefix of partition fields that are not already in the
sort order
- SortOrder.Builder builder = SortOrder.builderFor(schema);
- for (PartitionField field : spec.fields()) {
- Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
- boolean isSorted = sortFields.stream().anyMatch(sortField ->
- field.transform().equals(sortField.transform()) ||
sortField.transform().satisfiesOrderOf(field.transform()));
- if (!isSorted) {
- String sourceName = schema.findColumnName(field.sourceId());
-
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName,
field.transform()));
- }
+ public static Distribution
buildRequiredDistribution(org.apache.iceberg.Table table) {
+ DistributionMode distributionMode = getDistributionMode(table);
+ switch (distributionMode) {
+ case NONE:
+ return Distributions.unspecified();
+ case HASH:
+ return Distributions.clustered(toTransforms(table.spec()));
Review comment:
Is it correct to return a clustered distribution with no expressions if
the spec is unpartitioned? I think I would rather return
`Distributions.unspecified` just to be safe when passing this back to Spark.
##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+ private SortOrderUtil() {
+ }
+
+ public static SortOrder buildSortOrder(Table table) {
+ return buildSortOrder(table.spec(), table.sortOrder());
+ }
+
+ public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder
sortOrder) {
+ if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
+ return sortOrder;
+ }
+
+ Schema schema = spec.schema();
+
+ Multimap<Integer, SortField> sortFieldIndex =
Multimaps.index(sortOrder.fields(), SortField::sourceId);
+
+ // build a sort prefix of partition fields that are not already in the
sort order
+ SortOrder.Builder builder = SortOrder.builderFor(schema);
Review comment:
The logic here is unchanged from what was in `Spark3Util`, right?
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
!(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
}
- def buildWritePlan(
- childPlan: LogicalPlan,
- table: Table): LogicalPlan = {
- val defaultDistributionMode = table match {
- case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
- TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
- case _ =>
- TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
- }
-
+ private def buildWritePlan(childPlan: LogicalPlan, table: Table):
LogicalPlan = {
table match {
- case iceTable: SparkTable =>
- val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
- val table = iceTable.table()
- val distributionMode: String = table.properties
- .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE,
defaultDistributionMode)
- val order = toCatalyst(toOrderedDistribution(table.spec(),
table.sortOrder(), true), childPlan)
- DistributionMode.fromName(distributionMode) match {
- case DistributionMode.NONE =>
- Sort(buildSortOrder(order), global = false, childPlan)
- case DistributionMode.HASH =>
- val clustering = toCatalyst(toClusteredDistribution(table.spec()),
childPlan)
- val hashPartitioned = RepartitionByExpression(clustering,
childPlan, numShufflePartitions)
- Sort(buildSortOrder(order), global = false, hashPartitioned)
- case DistributionMode.RANGE =>
- val roundRobin = Repartition(numShufflePartitions, shuffle = true,
childPlan)
- Sort(buildSortOrder(order), global = true, roundRobin)
+ case iceberg: SparkTable =>
+ val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
Review comment:
You're right. I was thinking it was on Table instead. Let's go with this
then.
##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName,
int sourceId, String tr
return transforms.toArray(new Transform[0]);
}
- public static Distribution toOrderedDistribution(PartitionSpec spec,
SortOrder sortOrder, boolean inferFromSpec) {
- if (sortOrder.isUnsorted()) {
- if (inferFromSpec) {
- SortOrder specOrder = Partitioning.sortOrderFor(spec);
- return Distributions.ordered(convert(specOrder));
- }
-
- return Distributions.unspecified();
- }
-
- Schema schema = spec.schema();
- Multimap<Integer, SortField> sortFieldIndex =
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
- // build a sort prefix of partition fields that are not already in the
sort order
- SortOrder.Builder builder = SortOrder.builderFor(schema);
- for (PartitionField field : spec.fields()) {
- Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
- boolean isSorted = sortFields.stream().anyMatch(sortField ->
- field.transform().equals(sortField.transform()) ||
sortField.transform().satisfiesOrderOf(field.transform()));
- if (!isSorted) {
- String sourceName = schema.findColumnName(field.sourceId());
-
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName,
field.transform()));
- }
+ public static Distribution
buildRequiredDistribution(org.apache.iceberg.Table table) {
+ DistributionMode distributionMode = getDistributionMode(table);
+ switch (distributionMode) {
+ case NONE:
+ return Distributions.unspecified();
+ case HASH:
+ return Distributions.clustered(toTransforms(table.spec()));
Review comment:
A sorted table may not be partitioned, but it would pass the check you
added. Then if the distribution mode is hash, it would return an empty
clustered distribution. I think it would be more correct and easier to reason
about if the check was done here.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/IcebergImplicits.scala
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.Table
+
+object IcebergImplicits {
+ implicit class TableHelper(table: Table) {
+ def asIcebergTable: org.apache.iceberg.Table = {
Review comment:
Should this be named `toIcebergTable` or `icebergTable`? This is doing
more than just a cast, it is accessing the underlying table.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]