This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 99e1089898 [core] Supoort Option of adding columns before partition 
(#7374)
99e1089898 is described below

commit 99e1089898046cfbca6af35a202473de2eaff1e0
Author: umi <[email protected]>
AuthorDate: Thu Mar 12 17:39:29 2026 +0800

    [core] Supoort Option of adding columns before partition (#7374)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  14 ++
 .../org/apache/paimon/schema/SchemaManager.java    |  19 ++
 .../apache/paimon/flink/SchemaChangeITCase.java    | 198 +++++++++++++++++++++
 paimon-python/pypaimon/common/options/__init__.py  |   2 +
 .../pypaimon/common/options/core_options.py        |  13 ++
 paimon-python/pypaimon/schema/schema_manager.py    |  29 ++-
 .../pypaimon/tests/filesystem_catalog_test.py      |  82 ++++++++-
 8 files changed, 358 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 673d988dab..e9d66c7a38 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to remove the whole row in aggregation engine when -D 
records are received.</td>
         </tr>
+        <tr>
+            <td><h5>add-column-before-partition</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, when adding a new column without specifying a 
position, the column will be placed before the first partition column instead 
of at the end of the schema. This only takes effect for partitioned tables.</td>
+        </tr>
         <tr>
             <td><h5>alter-column-null-to-not-null.disabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d5934b65db..d85cf9ae37 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2040,6 +2040,16 @@ public class CoreOptions implements Serializable {
                             "If true, it disables explicit type casting. For 
ex: it disables converting LONG type to INT type. "
                                     + "Users can enable this option to disable 
explicit type casting");
 
+    public static final ConfigOption<Boolean> ADD_COLUMN_BEFORE_PARTITION =
+            ConfigOptions.key("add-column-before-partition")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, when adding a new column without 
specifying a position, "
+                                    + "the column will be placed before the 
first partition column "
+                                    + "instead of at the end of the schema. "
+                                    + "This only takes effect for partitioned 
tables.");
+
     public static final ConfigOption<Long> 
COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT =
             ConfigOptions.key("commit.strict-mode.last-safe-snapshot")
                     .longType()
@@ -2955,6 +2965,10 @@ public class CoreOptions implements Serializable {
         return options.get(DISABLE_EXPLICIT_TYPE_CASTING);
     }
 
+    public boolean addColumnBeforePartition() {
+        return options.get(ADD_COLUMN_BEFORE_PARTITION);
+    }
+
     public boolean indexFileInDataFileDir() {
         return options.get(INDEX_FILE_IN_DATA_FILE_DIR);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index e726b80326..81585a1d44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -293,6 +293,14 @@ public class SchemaManager implements Serializable {
                                 CoreOptions.DISABLE_EXPLICIT_TYPE_CASTING
                                         .defaultValue()
                                         .toString()));
+
+        boolean addColumnBeforePartition =
+                Boolean.parseBoolean(
+                        oldOptions.getOrDefault(
+                                CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(),
+                                
CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString()));
+        List<String> partitionKeys = oldTableSchema.partitionKeys();
+
         List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
         AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
         String newComment = oldTableSchema.comment();
@@ -368,6 +376,17 @@ public class SchemaManager implements Serializable {
                                 throw new UnsupportedOperationException(
                                         "Unsupported move type: " + 
move.type());
                             }
+                        } else if (addColumnBeforePartition
+                                && !partitionKeys.isEmpty()
+                                && addColumn.fieldNames().length == 1) {
+                            int insertIndex = newFields.size();
+                            for (int i = 0; i < newFields.size(); i++) {
+                                if 
(partitionKeys.contains(newFields.get(i).name())) {
+                                    insertIndex = i;
+                                    break;
+                                }
+                            }
+                            newFields.add(insertIndex, dataField);
                         } else {
                             newFields.add(dataField);
                         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 9084b55d60..840dce7942 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1578,4 +1578,202 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
         sql("ALTER TABLE T MODIFY v INT");
         assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 
10), Row.of(2, 20));
     }
+
+    @Test
+    public void testAddColumnBeforePartitionEnabled() {
+        sql(
+                "CREATE TABLE T_PART (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    behavior STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    hh STRING\n"
+                        + ") PARTITIONED BY (dt, hh) WITH (\n"
+                        + "    'add-column-before-partition' = 'true'\n"
+                        + ")");
+
+        sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')");
+
+        // Add column without specifying position
+        sql("ALTER TABLE T_PART ADD score DOUBLE");
+
+        List<Row> result = sql("SHOW CREATE TABLE T_PART");
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `behavior` VARCHAR(2147483647),\n"
+                                + "  `score` DOUBLE,\n"
+                                + "  `dt` VARCHAR(2147483647),\n"
+                                + "  `hh` VARCHAR(2147483647)");
+
+        sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', 
'11')");
+        result = sql("SELECT * FROM T_PART");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1L, 100L, "buy", null, "2024-01-01", "10"),
+                        Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11"));
+    }
+
+    @Test
+    public void testAddColumnBeforePartitionDisabledByDefault() {
+        sql(
+                "CREATE TABLE T_PART_DEFAULT (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    dt STRING\n"
+                        + ") PARTITIONED BY (dt)");
+
+        // Add column without specifying position (default behavior)
+        sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE");
+
+        List<Row> result = sql("SHOW CREATE TABLE T_PART_DEFAULT");
+        // score should be appended at the end
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `dt` VARCHAR(2147483647),\n"
+                                + "  `score` DOUBLE");
+    }
+
+    @Test
+    public void testAddColumnBeforePartitionWithExplicitPosition() {
+        sql(
+                "CREATE TABLE T_PART_POS (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    dt STRING\n"
+                        + ") PARTITIONED BY (dt) WITH (\n"
+                        + "    'add-column-before-partition' = 'true'\n"
+                        + ")");
+
+        // Add column with explicit FIRST position, should respect explicit 
position
+        sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST");
+
+        List<Row> result = sql("SHOW CREATE TABLE T_PART_POS");
+        assertThat(result.toString())
+                .contains(
+                        "`score` DOUBLE,\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `dt` VARCHAR(2147483647)");
+    }
+
+    @Test
+    public void testAddColumnBeforePartitionViaAlterOption() {
+        sql(
+                "CREATE TABLE T_PART_ALTER (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    dt STRING\n"
+                        + ") PARTITIONED BY (dt)");
+
+        // First add column without config (default: append at end)
+        sql("ALTER TABLE T_PART_ALTER ADD col1 INT");
+        List<Row> result = sql("SHOW CREATE TABLE T_PART_ALTER");
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `dt` VARCHAR(2147483647),\n"
+                                + "  `col1` INT");
+
+        // Enable config via ALTER TABLE SET
+        sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 
'true')");
+
+        // Now add another column, should go before partition column dt
+        sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE");
+        result = sql("SHOW CREATE TABLE T_PART_ALTER");
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `col2` DOUBLE,\n"
+                                + "  `dt` VARCHAR(2147483647),\n"
+                                + "  `col1` INT");
+    }
+
+    @Test
+    public void testAddMultipleColumnsBeforePartition() {
+        sql(
+                "CREATE TABLE T_PART_MULTI (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    dt STRING,\n"
+                        + "    hh STRING\n"
+                        + ") PARTITIONED BY (dt, hh) WITH (\n"
+                        + "    'add-column-before-partition' = 'true'\n"
+                        + ")");
+
+        // Add first column
+        sql("ALTER TABLE T_PART_MULTI ADD col1 INT");
+        // Add second column
+        sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )");
+
+        List<Row> result = sql("SHOW CREATE TABLE T_PART_MULTI");
+        // Both new columns should be before partition columns dt and hh
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `col1` INT,\n"
+                                + "  `col2` INT,\n"
+                                + "  `col3` DOUBLE,\n"
+                                + "  `dt` VARCHAR(2147483647),\n"
+                                + "  `hh` VARCHAR(2147483647)");
+    }
+
+    @Test
+    public void testAddColumnBeforePartitionOnPrimaryKeyTable() {
+        sql(
+                "CREATE TABLE T_PK_PART (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    behavior STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    hh STRING,\n"
+                        + "    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt, hh) WITH (\n"
+                        + "    'add-column-before-partition' = 'true'\n"
+                        + ")");
+
+        sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')");
+
+        //  Add single column
+        sql("ALTER TABLE T_PK_PART ADD score DOUBLE");
+
+        List<Row> result = sql("SHOW CREATE TABLE T_PK_PART");
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT NOT NULL,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `behavior` VARCHAR(2147483647),\n"
+                                + "  `score` DOUBLE,\n"
+                                + "  `dt` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `hh` VARCHAR(2147483647) NOT NULL");
+
+        // Add multiple columns
+        sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )");
+
+        result = sql("SHOW CREATE TABLE T_PK_PART");
+        assertThat(result.toString())
+                .contains(
+                        "`user_id` BIGINT NOT NULL,\n"
+                                + "  `item_id` BIGINT,\n"
+                                + "  `behavior` VARCHAR(2147483647),\n"
+                                + "  `score` DOUBLE,\n"
+                                + "  `col1` INT,\n"
+                                + "  `col2` DOUBLE,\n"
+                                + "  `dt` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `hh` VARCHAR(2147483647) NOT NULL");
+
+        // Verify data read/write still works
+        sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, 
'2024-01-02', '11')");
+        result = sql("SELECT * FROM T_PK_PART");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1L, 100L, "buy", null, null, null, 
"2024-01-01", "10"),
+                        Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", 
"11"));
+    }
 }
diff --git a/paimon-python/pypaimon/common/options/__init__.py 
b/paimon-python/pypaimon/common/options/__init__.py
index b02d7bb5f4..8760397beb 100644
--- a/paimon-python/pypaimon/common/options/__init__.py
+++ b/paimon-python/pypaimon/common/options/__init__.py
@@ -19,10 +19,12 @@ limitations under the License.
 from .config_option import ConfigOption, Description
 from .config_options import ConfigOptions
 from .options import Options
+from .core_options import CoreOptions
 
 __all__ = [
     'ConfigOption',
     'Description',
     'ConfigOptions',
     'Options',
+    'CoreOptions'
 ]
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 85cf965f91..6e6fc4108b 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -376,6 +376,16 @@ class CoreOptions:
         .with_description("Read batch size for any file format if it 
supports.")
     )
 
+    ADD_COLUMN_BEFORE_PARTITION: ConfigOption[bool] = (
+        ConfigOptions.key("add-column-before-partition")
+        .boolean_type()
+        .default_value(False)
+        .with_description(
+            "When adding a new column, if the table has partition keys, "
+            "insert the new column before the first partition column by 
default."
+        )
+    )
+
     def __init__(self, options: Options):
         self.options = options
 
@@ -540,3 +550,6 @@ class CoreOptions:
 
     def read_batch_size(self, default=None) -> int:
         return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)
+
+    def add_column_before_partition(self) -> bool:
+        return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index c8f007ba08..20298be7e0 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -20,6 +20,7 @@ from typing import Optional, List
 from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, 
ColumnNotExistException
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.json_util import JSON
+from pypaimon.common.options import Options, CoreOptions
 from pypaimon.schema.data_types import AtomicInteger, DataField
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.schema_change import (
@@ -181,7 +182,11 @@ def _apply_move(fields: List[DataField], new_field: 
Optional[DataField], move):
 
 
 def _handle_add_column(
-        change: AddColumn, new_fields: List[DataField], highest_field_id: 
AtomicInteger
+    change: AddColumn,
+    new_fields: List[DataField],
+    highest_field_id: AtomicInteger,
+    partition_keys: List[str],
+    add_column_before_partition: bool
 ):
     if not change.data_type.nullable:
         raise ValueError(
@@ -194,6 +199,17 @@ def _handle_add_column(
     new_field = DataField(field_id, field_name, change.data_type, 
change.comment)
     if change.move:
         _apply_move(new_fields, new_field, change.move)
+    elif (
+        add_column_before_partition
+        and partition_keys
+        and len(change.field_names) == 1
+    ):
+        insert_index = len(new_fields)
+        for i, field in enumerate(new_fields):
+            if field.name in partition_keys:
+                insert_index = i
+                break
+        new_fields.insert(insert_index, new_field)
     else:
         new_fields.append(new_field)
 
@@ -278,7 +294,7 @@ class SchemaManager:
                     f"Table schema does not exist at path: {self.table_path}. "
                     "This may happen if the table was deleted concurrently."
                 )
-            
+
             new_table_schema = self._generate_table_schema(old_table_schema, 
changes)
             try:
                 success = self.commit(new_table_schema)
@@ -306,6 +322,10 @@ class SchemaManager:
         highest_field_id = AtomicInteger(old_table_schema.highest_field_id)
         new_comment = old_table_schema.comment
 
+        # Get add_column_before_partition option
+        add_column_before_partition = 
CoreOptions(Options(old_table_schema.options)).add_column_before_partition()
+        partition_keys = old_table_schema.partition_keys
+
         for change in changes:
             if isinstance(change, SetOption):
                 new_options[change.key] = change.value
@@ -314,7 +334,10 @@ class SchemaManager:
             elif isinstance(change, UpdateComment):
                 new_comment = change.comment
             elif isinstance(change, AddColumn):
-                _handle_add_column(change, new_fields, highest_field_id)
+                _handle_add_column(
+                    change, new_fields, highest_field_id,
+                    partition_keys, add_column_before_partition
+                )
             elif isinstance(change, RenameColumn):
                 _assert_not_updating_partition_keys(
                     old_table_schema, change.field_names, "rename"
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index fe2c19f18a..c778c27082 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -172,6 +172,84 @@ class FileSystemCatalogTest(unittest.TestCase):
         table = catalog.get_table(identifier)
         self.assertEqual(len(table.fields), 2)
 
+    def test_add_column_before_partition(self):
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db", False)
+
+        identifier = "test_db.test_table"
+        schema = Schema(
+            fields=[
+                DataField.from_dict({"id": 0, "name": "col1", "type": 
"STRING", "description": "field1"}),
+                DataField.from_dict(
+                    {"id": 1, "name": "partition_col", "type": "STRING", 
"description": "partition field"})
+            ],
+            partition_keys=["partition_col"],
+            primary_keys=[],
+            options={},
+            comment="comment"
+        )
+        catalog.create_table(identifier, schema, False)
+
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 2)
+        self.assertEqual(table.fields[1].name, "partition_col")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.set_option("add-column-before-partition", "true")],
+            False
+        )
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.add_column("new_col", AtomicType("INT"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 3)
+        self.assertEqual(table.fields[0].name, "col1")
+        self.assertEqual(table.fields[1].name, "new_col")
+        self.assertEqual(table.fields[2].name, "partition_col")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.add_column("col_multi1", AtomicType("INT")),
+             SchemaChange.add_column("col_multi2", AtomicType("STRING")),
+             SchemaChange.add_column("col_multi3", AtomicType("DOUBLE"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 6)
+        self.assertEqual(table.fields[0].name, "col1")
+        self.assertEqual(table.fields[1].name, "new_col")
+        self.assertEqual(table.fields[2].name, "col_multi1")
+        self.assertEqual(table.fields[3].name, "col_multi2")
+        self.assertEqual(table.fields[4].name, "col_multi3")
+        self.assertEqual(table.fields[5].name, "partition_col")
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.set_option("add-column-before-partition", "false")],
+            False
+        )
+
+        catalog.alter_table(
+            identifier,
+            [SchemaChange.add_column("another_col", AtomicType("BIGINT"))],
+            False
+        )
+        table = catalog.get_table(identifier)
+        self.assertEqual(len(table.fields), 7)
+        self.assertEqual(table.fields[0].name, "col1")
+        self.assertEqual(table.fields[1].name, "new_col")
+        self.assertEqual(table.fields[2].name, "col_multi1")
+        self.assertEqual(table.fields[3].name, "col_multi2")
+        self.assertEqual(table.fields[4].name, "col_multi3")
+        self.assertEqual(table.fields[5].name, "partition_col")
+        self.assertEqual(table.fields[6].name, "another_col")
+
     def test_get_database_propagates_exists_error(self):
         catalog = CatalogFactory.create({
             "warehouse": self.warehouse
@@ -186,7 +264,7 @@ class FileSystemCatalogTest(unittest.TestCase):
         from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
         self.assertIsInstance(catalog, FileSystemCatalog)
         filesystem_catalog = catalog  # type: FileSystemCatalog
-        
+
         original_exists = filesystem_catalog.file_io.exists
         filesystem_catalog.file_io.exists = 
MagicMock(side_effect=OSError("Permission denied"))
 
@@ -195,6 +273,6 @@ class FileSystemCatalogTest(unittest.TestCase):
             catalog.get_database("test_db")
         self.assertIn("Permission denied", str(context.exception))
         self.assertNotIsInstance(context.exception, DatabaseNotExistException)
-        
+
         # Restore original method
         filesystem_catalog.file_io.exists = original_exists

Reply via email to