This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 33f175f  HBASE-25984: Avoid premature reuse of sync futures in FSHLog 
(#3371) (#3394)
33f175f is described below

commit 33f175fa03bdcb1227a86f4d4361d137a453e844
Author: Bharath Vissapragada <[email protected]>
AuthorDate: Thu Jun 17 12:29:02 2021 -0700

    HBASE-25984: Avoid premature reuse of sync futures in FSHLog (#3371) (#3394)
    
    Signed-off-by: Viraj Jasani <[email protected]>
    (cherry picked from commit 5a19bcfa98b3ccd9f7fb1fb933248c808676d91c)
---
 .../hbase/regionserver/wal/AbstractFSWAL.java      | 22 ++----
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  | 14 +++-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      | 23 +++++-
 .../hadoop/hbase/regionserver/wal/SyncFuture.java  | 12 ++-
 .../hbase/regionserver/wal/SyncFutureCache.java    | 74 ++++++++++++++++++
 .../hadoop/hbase/regionserver/wal/TestFSHLog.java  | 91 ++++++++++++++++++++++
 .../regionserver/wal/TestSyncFutureCache.java      | 68 ++++++++++++++++
 7 files changed, 284 insertions(+), 20 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index c017e9f..7bf20cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -295,11 +295,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
 
   /**
-   * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse 
SyncFutures.
-   * Thread local is used so JVM can GC the terminated thread for us. See 
HBASE-21228
-   * <p>
+   * A cache of sync futures reused by threads.
    */
-  private final ThreadLocal<SyncFuture> cachedSyncFutures;
+  protected final SyncFutureCache syncFutureCache;
 
   /**
    * The class name of the runtime implementation, used as prefix for 
logging/tracing.
@@ -455,12 +453,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
       DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
     this.walSyncTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
       conf.getLong("hbase.regionserver.hlog.sync.timeout", 
DEFAULT_WAL_SYNC_TIMEOUT_MS)));
-    this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
-      @Override
-      protected SyncFuture initialValue() {
-        return new SyncFuture();
-      }
-    };
+    this.syncFutureCache = new SyncFutureCache(conf);
     this.implClassName = getClass().getSimpleName();
     this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, 
HRegion.DEFAULT_WAL_HSYNC);
   }
@@ -753,10 +746,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
         }
       }
     } catch (TimeoutIOException tioe) {
-      // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
-      // still refer to it, so if this thread use it next time may get a wrong
-      // result.
-      this.cachedSyncFutures.remove();
       throw tioe;
     } catch (InterruptedException ie) {
       LOG.warn("Interrupted", ie);
@@ -859,6 +848,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     rollWriterLock.lock();
     try {
       doShutdown();
+      if (syncFutureCache != null) {
+        syncFutureCache.clear();
+      }
     } finally {
       rollWriterLock.unlock();
     }
@@ -905,7 +897,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
   }
 
   protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
-    return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
+    return 
syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
   }
 
   protected boolean isLogRollRequested() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 3c799bf..c9168e2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -253,6 +253,14 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
       DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
   }
 
+  /**
+   * Helper that marks the future as DONE and offers it back to the cache.
+   */
+  private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable 
t) {
+    future.done(txid, t);
+    syncFutureCache.offer(future);
+  }
+
   private static boolean waitingRoll(int epochAndState) {
     return (epochAndState & 1) != 0;
   }
@@ -393,7 +401,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
       SyncFuture sync = iter.next();
       if (sync.getTxid() <= txid) {
-        sync.done(txid, null);
+        markFutureDoneAndOffer(sync, txid, null);
         iter.remove();
         finished++;
         if (addSyncTrace) {
@@ -415,7 +423,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
         long maxSyncTxid = highestSyncedTxid.get();
         for (SyncFuture sync : syncFutures) {
           maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
-          sync.done(maxSyncTxid, null);
+          markFutureDoneAndOffer(sync, maxSyncTxid, null);
           if (addSyncTrace) {
             addTimeAnnotation(sync, "writer synced");
           }
@@ -751,7 +759,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       }
     }
     // and fail them
-    syncFutures.forEach(f -> f.done(f.getTxid(), error));
+    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
     if (!(consumeExecutor instanceof EventLoop)) {
       consumeExecutor.shutdown();
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 2aae540..9674aca 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -860,6 +860,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
 
     /**
+     * @return if the safepoint has been attained.
+     */
+    @InterfaceAudience.Private
+    boolean isSafePointAttained() {
+      return this.safePointAttainedLatch.getCount() == 0;
+    }
+
+    /**
      * Called by Thread B when it attains the 'safe point'. In this method, 
Thread B signals Thread
      * A it can proceed. Thread B will be held in here until {@link 
#releaseSafePoint()} is called
      * by Thread A.
@@ -945,6 +953,16 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       for (int i = 0; i < this.syncFuturesCount.get(); i++) {
         this.syncFutures[i].done(sequence, e);
       }
+      offerDoneSyncsBackToCache();
+    }
+
+    /**
+     * Offers the finished syncs back to the cache for reuse.
+     */
+    private void offerDoneSyncsBackToCache() {
+      for (int i = 0; i < this.syncFuturesCount.get(); i++) {
+        syncFutureCache.offer(syncFutures[i]);
+      }
       this.syncFuturesCount.set(0);
     }
 
@@ -1059,7 +1077,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
               ? this.exception : new DamagedWALException("On sync", 
this.exception));
         }
         attainSafePoint(sequence);
-        this.syncFuturesCount.set(0);
+        // It is critical that we offer the futures back to the cache for 
reuse here after the
+        // safe point is attained and all the clean up has been done. There 
have been
+        // issues with reusing sync futures early causing WAL lockups, see 
HBASE-25984.
+        offerDoneSyncsBackToCache();
       } catch (Throwable t) {
         LOG.error("UNEXPECTED!!! syncFutures.length=" + 
this.syncFutures.length, t);
       }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index 650f68c..edba5df 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -90,7 +90,8 @@ class SyncFuture {
 
   @Override
   public synchronized String toString() {
-    return "done=" + isDone() + ", txid=" + this.txid;
+    return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + 
t.getId() +
+        " threadName=" + t.getName();
   }
 
   synchronized long getTxid() {
@@ -107,6 +108,15 @@ class SyncFuture {
   }
 
   /**
+   * Returns the thread that owned this sync future, use with caution as we 
return the reference to
+   * the actual thread object.
+   * @return the associated thread instance.
+   */
+  Thread getThread() {
+    return t;
+  }
+
+  /**
    * @param txid the transaction id at which this future 'completed'.
    * @param t Can be null. Set if we are 'completing' on error (and this 't' 
is the error).
    * @return True if we successfully marked this outstanding future as 
completed/done. Returns false
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
new file mode 100644
index 0000000..9dd2e0a
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+
+/**
+ * A cache of {@link SyncFuture}s.  This class supports two methods
+ * {@link SyncFutureCache#getIfPresentOrNew()} and {@link 
SyncFutureCache#offer(SyncFuture)}.
+ *
+ * Usage pattern:
+ *   SyncFuture sf = syncFutureCache.getIfPresentOrNew();
+ *   sf.reset(...);
+ *   // Use the sync future
+ *   finally: syncFutureCache.offer(sf);
+ *
+ * Offering the sync future back to the cache makes it eligible for reuse 
within the same thread
+ * context. Cache keyed by the accessing thread instance and automatically 
invalidated if it remains
+ * unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} 
minutes.
+ */
[email protected]
+public final class SyncFutureCache {
+
+  private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
+
+  private final Cache<Thread, SyncFuture> syncFutureCache;
+
+  public SyncFutureCache(final Configuration conf) {
+    final int handlerCount = 
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+    syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
+        .expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, 
TimeUnit.MINUTES).build();
+  }
+
+  public SyncFuture getIfPresentOrNew() {
+    // Invalidate the entry if a mapping exists. We do not want it to be 
reused at the same time.
+    SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
+    return (future == null) ? new SyncFuture() : future;
+  }
+
+  /**
+   * Offers the sync future back to the cache for reuse.
+   */
+  public void offer(SyncFuture syncFuture) {
+    // It is ok to overwrite an existing mapping.
+    syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
+  }
+
+  public void clear() {
+    if (syncFutureCache != null) {
+      syncFutureCache.invalidateAll();
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 0376dfa..30bdea7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -27,6 +29,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -49,8 +53,10 @@ import 
org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestFSHLog.class);
 
+  private static final long TEST_TIMEOUT_MS = 10000;
+
   @Rule
   public TestName name = new TestName();
 
@@ -132,6 +140,89 @@ public class TestFSHLog extends AbstractTestFSWAL {
   }
 
   /**
+   * Test for WAL stall due to sync future overwrites. See HBASE-25984.
+   */
+  @Test
+  public void testDeadlockWithSyncOverwrites() throws Exception {
+    final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
+
+    class FailingWriter implements WALProvider.Writer {
+      @Override public void sync(boolean forceSync) throws IOException {
+        throw new IOException("Injected failure..");
+      }
+
+      @Override public void append(WAL.Entry entry) throws IOException {
+      }
+
+      @Override public long getLength() {
+        return 0;
+      }
+
+      @Override
+      public long getSyncedLength() {
+        return 0;
+      }
+
+      @Override public void close() throws IOException {
+      }
+    }
+
+    /*
+     * Custom FSHLog implementation with a conditional wait before attaining 
safe point.
+     */
+    class CustomFSHLog extends FSHLog {
+      public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String 
archiveDir,
+                          Configuration conf, List<WALActionsListener> 
listeners, boolean failIfWALExists,
+                          String prefix, String suffix) throws IOException {
+        super(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix, suffix);
+      }
+
+      @Override
+      protected void beforeWaitOnSafePoint() {
+        try {
+          assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, 
TimeUnit.MILLISECONDS));
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      public SyncFuture publishSyncOnRingBuffer() {
+        long sequence = getSequenceOnRingBuffer();
+        return publishSyncOnRingBuffer(sequence, false);
+      }
+    }
+
+    final String name = this.name.getMethodName();
+    try (CustomFSHLog log = new CustomFSHLog(FS, 
CommonFSUtils.getRootDir(CONF), name,
+        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
+      log.setWriter(new FailingWriter());
+      Field ringBufferEventHandlerField =
+          FSHLog.class.getDeclaredField("ringBufferEventHandler");
+      ringBufferEventHandlerField.setAccessible(true);
+      FSHLog.RingBufferEventHandler ringBufferEventHandler =
+          (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
+      // Force a safe point
+      FSHLog.SafePointZigZagLatch latch = 
ringBufferEventHandler.attainSafePoint();
+      try {
+        SyncFuture future0 = log.publishSyncOnRingBuffer();
+        // Wait for the sync to be done.
+        Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
+        // Publish another sync from the same thread, this should not 
overwrite the done sync.
+        SyncFuture future1 = log.publishSyncOnRingBuffer();
+        assertFalse(future1.isDone());
+        // Unblock the safe point trigger..
+        blockBeforeSafePoint.countDown();
+        // Wait for the safe point to be reached.
+        // With the deadlock in HBASE-25984, this is never possible, thus 
blocking the sync pipeline.
+        Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
+      } finally {
+        // Force release the safe point, for the clean up.
+        latch.releaseSafePoint();
+      }
+    }
+  }
+
+  /**
    * Test case for https://issues.apache.org/jira/browse/HBASE-16721
    */
   @Test
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
new file mode 100644
index 0000000..070aaf2
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestSyncFutureCache {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSyncFutureCache.class);
+
+  @Test
+  public void testSyncFutureCacheLifeCycle() throws Exception {
+    final Configuration conf = HBaseConfiguration.create();
+    SyncFutureCache cache = new SyncFutureCache(conf);
+    try {
+      SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
+      assertNotNull(future0);
+      // Get another future from the same thread, should be different one.
+      SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
+      assertNotNull(future1);
+      assertNotSame(future0, future1);
+      cache.offer(future1);
+      // Should override.
+      cache.offer(future0);
+      SyncFuture future3 = cache.getIfPresentOrNew();
+      assertEquals(future3, future0);
+      final SyncFuture[] future4 = new SyncFuture[1];
+      // From a different thread
+      CompletableFuture.runAsync(() -> future4[0] = 
cache.getIfPresentOrNew().reset(4)).get();
+      assertNotNull(future4[0]);
+      assertNotSame(future3, future4[0]);
+      // Clean up
+      cache.offer(future3);
+      cache.offer(future4[0]);
+    } finally {
+      cache.clear();
+    }
+  }
+}

Reply via email to