This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 794cdbb  Optimize memory:Support shrinking in 
ConcurrentLongLongPairHashMap (#3061)
794cdbb is described below

commit 794cdbb5fc6e13e19865b98a386430f103b7408a
Author: lin chen <[email protected]>
AuthorDate: Tue Feb 22 13:01:03 2022 +0800

    Optimize memory:Support shrinking in ConcurrentLongLongPairHashMap (#3061)
    
    * support shrink
    
    * Reduce unnecessary rehash
    
    * check style
    
    * fix: unnecessary rehash
    
    * add unit test: testExpandAndShrink
    
    * fix unit test: testExpandAndShrink
    
    * fix test:
    1.verify that the map is able to expand after shrink;
    2.does not keep shrinking at every remove() operation;
    
    * 1.add builder;
    2.add config:
      ①MapFillFactor;②MapIdleFactor;③autoShrink;④expandFactor;⑤shrinkFactor
    
    * check style
    
    * 1.check style;
    2.add check :
    shrinkFactor>1
    expandFactor>1
    
    * check style
    
    * keep and Deprecate all the public constructors.
    
    * add final for  autoShrink
    
    * fix unit test testExpandAndShrink, set autoShrink true
    
    * add method for update parameters value:
      ①setMapFillFactor
      ②setMapIdleFactor
      ③setExpandFactor
      ④setShrinkFactor
      ⑤setAutoShrink
    
    * use lombok.Setter replace lombok.Data
    
    * use pulic for getUsedBucketCount
    
    * 1.check parameters;
    2.fix the shrinkage condition:
      ①newCapacity > size: in order to prevent the infinite loop of rehash, 
newCapacity should be larger than the currently used size;
      ②newCapacity > resizeThresholdUp: in order to prevent continuous 
expansion and contraction, newCapacity should be greater than the expansion 
threshold;
    
    * 1.update  parameters check;
    2.fix newCapacity calculation when shrinking :
      rehash((int) Math.max(size / mapFillFactor, capacity / shrinkFactor));
    
    * remove set methods:
    ①setMapFillFactor
    ②setMapIdleFactor
    ③setExpandFactor
    ④setShrinkFactor
    ⑤setAutoShrink
    
    * Repair shrinkage conditions: ①newCapacity must be the nth power of 2; 
②reduce unnecessary shrinkage;
    
    * Repair shrinkage conditions
    
    * add shrinkage when clear
    
    * 1.add test for clear shrink
    2. fix initCapacity value
---
 .../bookkeeper/bookie/storage/ldb/ReadCache.java   |   6 +-
 .../bookkeeper/bookie/storage/ldb/WriteCache.java  |   6 +-
 .../collections/ConcurrentLongLongPairHashMap.java | 152 +++++++++++++++++++--
 .../ConcurrentLongLongPairHashMapTest.java         | 114 ++++++++++++++--
 4 files changed, 246 insertions(+), 32 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
index f4038b4..50f4a3b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
@@ -77,7 +77,11 @@ public class ReadCache implements Closeable {
 
         for (int i = 0; i < segmentsCount; i++) {
             cacheSegments.add(Unpooled.directBuffer(segmentSize, segmentSize));
-            cacheIndexes.add(new ConcurrentLongLongPairHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors()));
+            ConcurrentLongLongPairHashMap concurrentLongLongPairHashMap = 
ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(4096)
+                    .concurrencyLevel(2 * 
Runtime.getRuntime().availableProcessors())
+                    .build();
+            cacheIndexes.add(concurrentLongLongPairHashMap);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index 66eeb3b..fedb61a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -60,8 +60,10 @@ public class WriteCache implements Closeable {
         void accept(long ledgerId, long entryId, ByteBuf entry);
     }
 
-    private final ConcurrentLongLongPairHashMap index =
-            new ConcurrentLongLongPairHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
+    private final ConcurrentLongLongPairHashMap index = 
ConcurrentLongLongPairHashMap.newBuilder()
+            .expectedItems(4096)
+            .concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
+            .build();
 
     private final ConcurrentLongLongHashMap lastEntryMap =
             new ConcurrentLongLongHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
index 42cb04b..6775610 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -47,13 +47,77 @@ public class ConcurrentLongLongPairHashMap {
 
     private static final long ValueNotFound = -1L;
 
-    private static final float MapFillFactor = 0.66f;
 
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section[] sections;
 
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongLongPairHashMap.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongLongPairHashMap build() {
+            return new ConcurrentLongLongPairHashMap(expectedItems, 
concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, 
shrinkFactor);
+        }
+    }
+
     /**
      * A BiConsumer Long pair.
      */
@@ -75,26 +139,42 @@ public class ConcurrentLongLongPairHashMap {
         boolean test(long key1, long key2, long value1, long value2);
     }
 
+    @Deprecated
     public ConcurrentLongLongPairHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongLongPairHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongLongPairHashMap(int expectedItems, int 
concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, 
DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    private ConcurrentLongLongPairHashMap(int expectedItems, int 
concurrencyLevel,
+                                          float mapFillFactor, float 
mapIdleFactor,
+                                         boolean autoShrink, float 
expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / 
MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / 
mapFillFactor);
         this.sections = new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section(perSectionCapacity);
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, 
mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -226,16 +306,31 @@ public class ConcurrentLongLongPairHashMap {
         private volatile long[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, 
boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new long[4 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
             Arrays.fill(table, EmptyKey);
         }
 
@@ -336,9 +431,11 @@ public class ConcurrentLongLongPairHashMap {
                     bucket = (bucket + 4) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * 
expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -376,7 +473,20 @@ public class ConcurrentLongLongPairHashMap {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / 
shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * 
mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > 
size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -388,6 +498,18 @@ public class ConcurrentLongLongPairHashMap {
                 table[bucket + 2] = ValueNotFound;
                 table[bucket + 3] = ValueNotFound;
                 --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedKey` state, so 
that we can reduce unnecessary expansions
+                bucket = (bucket - 4) & (table.length - 1);
+                while (table[bucket] == DeletedKey) {
+                    table[bucket] = EmptyKey;
+                    table[bucket + 1] = EmptyKey;
+                    table[bucket + 2] = ValueNotFound;
+                    table[bucket + 3] = ValueNotFound;
+                    --usedBuckets;
+
+                    bucket = (bucket - 4) & (table.length - 1);
+                }
             } else {
                 table[bucket] = DeletedKey;
                 table[bucket + 1] = DeletedKey;
@@ -403,6 +525,9 @@ public class ConcurrentLongLongPairHashMap {
                 Arrays.fill(table, EmptyKey);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -453,9 +578,7 @@ public class ConcurrentLongLongPairHashMap {
             }
         }
 
-        private void rehash() {
-            // Expand the hashmap
-            int newCapacity = capacity * 2;
+        private void rehash(int newCapacity) {
             long[] newTable = new long[4 * newCapacity];
             Arrays.fill(newTable, EmptyKey);
 
@@ -475,7 +598,8 @@ public class ConcurrentLongLongPairHashMap {
             // Capacity needs to be updated after the values, so that we won't 
see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static void insertKeyValueNoLock(long[] table, int capacity, 
long key1, long key2, long value1,
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
index f8bfd52..b56bd31 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -48,21 +48,29 @@ public class ConcurrentLongLongPairHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongLongPairHashMap(0);
+             ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongLongPairHashMap(16, 0);
+            ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongLongPairHashMap(4, 8);
+            ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -71,8 +79,9 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap(16);
-
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .build();
         assertTrue(map.isEmpty());
         assertTrue(map.put(1, 1, 11, 11));
         assertFalse(map.isEmpty());
@@ -99,7 +108,9 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void testRemove() {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap();
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap
+                .newBuilder()
+                .build();
 
         assertTrue(map.isEmpty());
         assertTrue(map.put(1, 1, 11, 11));
@@ -114,8 +125,63 @@ public class ConcurrentLongLongPairHashMapTest {
     }
 
     @Test
+    public void testClear() {
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.put(1, 1, 11, 11));
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, 1, 11, 11));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, 2, 22, 22));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.put(4, 4, 44, 44));
+        assertTrue(map.put(5, 5, 55, 55));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() 
operation
+        assertTrue(map.put(6, 6, 66, 66));
+        assertTrue(map.remove(6, 6, 66, 66));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
     public void testNegativeUsedBucketCount() {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap(16, 1);
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(0, 0, 0, 0);
         assertEquals(1, map.getUsedBucketCount());
@@ -130,7 +196,10 @@ public class ConcurrentLongLongPairHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap(n / 2, 1);
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -145,7 +214,10 @@ public class ConcurrentLongLongPairHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap(n / 2, 1);
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -167,7 +239,8 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap();
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -206,7 +279,8 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap();
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
         ExecutorService executor = Executors.newCachedThreadPool();
 
         final int nThreads = 16;
@@ -245,7 +319,8 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap();
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -288,7 +363,9 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap();
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+
         assertTrue(map.putIfAbsent(1, 1, 11, 11));
         assertEquals(map.get(1, 1), new LongPair(11, 11));
 
@@ -298,7 +375,11 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void testIvalidKeys() {
-        ConcurrentLongLongPairHashMap map = new 
ConcurrentLongLongPairHashMap(16, 1);
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+
 
         try {
             map.put(-5, 3, 4, 4);
@@ -331,7 +412,10 @@ public class ConcurrentLongLongPairHashMapTest {
 
     @Test
     public void testAsMap() {
-        ConcurrentLongLongPairHashMap lmap = new 
ConcurrentLongLongPairHashMap(16, 1);
+        ConcurrentLongLongPairHashMap lmap = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         lmap.put(1, 1, 11, 11);
         lmap.put(2, 2, 22, 22);
         lmap.put(3, 3, 33, 33);

Reply via email to