This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2503c67c8453f150a751ede182445dabb90466d6
Author: xiarui <[email protected]>
AuthorDate: Thu Jan 16 15:28:00 2025 +0800

    [FLINK-37028][State] Introduce namespace wrapped state for window operator.
---
 .../AggregatingStateWithDeclaredNamespace.java     | 105 ++++++++++++++
 .../state/ListStateWithDeclaredNamespace.java      | 107 ++++++++++++++
 .../state/MapStateWithDeclaredNamespace.java       | 158 +++++++++++++++++++++
 .../state/ReducingStateWithDeclaredNamespace.java  | 105 ++++++++++++++
 .../declare/state/StateWithDeclaredNamespace.java  |  95 +++++++++++++
 .../state/ValueStateWithDeclaredNamespace.java     |  70 +++++++++
 6 files changed, 640 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/AggregatingStateWithDeclaredNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/AggregatingStateWithDeclaredNamespace.java
new file mode 100644
index 000000000000..e38e0aa031e9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/AggregatingStateWithDeclaredNamespace.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.declare.state;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/** Aggregating state wrapped with declared namespace. */
+class AggregatingStateWithDeclaredNamespace<K, N, IN, ACC, OUT>
+        extends StateWithDeclaredNamespace<K, N, ACC>
+        implements InternalAggregatingState<K, N, IN, ACC, OUT> {
+    private final InternalAggregatingState<K, N, IN, ACC, OUT> state;
+
+    public AggregatingStateWithDeclaredNamespace(
+            InternalAggregatingState<K, N, IN, ACC, OUT> state,
+            DeclaredVariable<N> declaredNamespace) {
+        super(state, declaredNamespace);
+        this.state = state;
+    }
+
+    @Override
+    public StateFuture<OUT> asyncGet() {
+        resetNamespace();
+        return state.asyncGet();
+    }
+
+    @Override
+    public StateFuture<Void> asyncAdd(IN value) {
+        resetNamespace();
+        return state.asyncAdd(value);
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear() {
+        resetNamespace();
+        return state.asyncClear();
+    }
+
+    @Override
+    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> 
sources) {
+        resetNamespace();
+        return state.asyncMergeNamespaces(target, sources);
+    }
+
+    @Override
+    public StateFuture<ACC> asyncGetInternal() {
+        resetNamespace();
+        return state.asyncGetInternal();
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) {
+        resetNamespace();
+        return state.asyncUpdateInternal(valueToStore);
+    }
+
+    @Override
+    public OUT get() {
+        return state.get();
+    }
+
+    @Override
+    public void add(IN value) {
+        state.add(value);
+    }
+
+    @Override
+    public void clear() {
+        state.clear();
+    }
+
+    @Override
+    public void mergeNamespaces(N target, Collection<N> sources) {
+        state.mergeNamespaces(target, sources);
+    }
+
+    @Override
+    public ACC getInternal() {
+        return state.getInternal();
+    }
+
+    @Override
+    public void updateInternal(ACC valueToStore) {
+        state.updateInternal(valueToStore);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ListStateWithDeclaredNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ListStateWithDeclaredNamespace.java
new file mode 100644
index 000000000000..2a2b0798b657
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ListStateWithDeclaredNamespace.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.declare.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+
+import java.util.Collection;
+import java.util.List;
+
+/** ListState wrapped with declared namespace. */
+@Internal
+class ListStateWithDeclaredNamespace<K, N, V> extends 
StateWithDeclaredNamespace<K, N, V>
+        implements InternalListState<K, N, V> {
+    private final InternalListState<K, N, V> state;
+
+    public ListStateWithDeclaredNamespace(
+            InternalListState<K, N, V> state, DeclaredVariable<N> 
declaredNamespace) {
+        super(state, declaredNamespace);
+        this.state = state;
+    }
+
+    @Override
+    public StateFuture<StateIterator<V>> asyncGet() {
+        resetNamespace();
+        return state.asyncGet();
+    }
+
+    @Override
+    public StateFuture<Void> asyncAdd(V value) {
+        resetNamespace();
+        return state.asyncAdd(value);
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdate(List<V> values) {
+        resetNamespace();
+        return state.asyncUpdate(values);
+    }
+
+    @Override
+    public StateFuture<Void> asyncAddAll(List<V> values) {
+        resetNamespace();
+        return state.asyncAddAll(values);
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear() {
+        resetNamespace();
+        return state.asyncClear();
+    }
+
+    @Override
+    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> 
sources) {
+        resetNamespace();
+        return state.asyncMergeNamespaces(target, sources);
+    }
+
+    @Override
+    public Iterable<V> get() {
+        return state.get();
+    }
+
+    @Override
+    public void add(V value) {
+        state.add(value);
+    }
+
+    @Override
+    public void update(List<V> values) {
+        state.update(values);
+    }
+
+    @Override
+    public void addAll(List<V> values) {
+        state.addAll(values);
+    }
+
+    @Override
+    public void clear() {
+        state.clear();
+    }
+
+    @Override
+    public void mergeNamespaces(N target, Collection<N> sources) {
+        state.mergeNamespaces(target, sources);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/MapStateWithDeclaredNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/MapStateWithDeclaredNamespace.java
new file mode 100644
index 000000000000..b5914195c922
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/MapStateWithDeclaredNamespace.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.declare.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.v2.internal.InternalMapState;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/** MapState wrapped with declared namespace. */
+@Internal
+class MapStateWithDeclaredNamespace<K, N, UK, UV> extends 
StateWithDeclaredNamespace<K, N, UV>
+        implements InternalMapState<K, N, UK, UV> {
+
+    private final InternalMapState<K, N, UK, UV> state;
+
+    public MapStateWithDeclaredNamespace(
+            InternalMapState<K, N, UK, UV> state, DeclaredVariable<N> 
declaredNamespace) {
+        super(state, declaredNamespace);
+        this.state = state;
+    }
+
+    @Override
+    public StateFuture<UV> asyncGet(UK key) {
+        resetNamespace();
+        return state.asyncGet(key);
+    }
+
+    @Override
+    public StateFuture<Void> asyncPut(UK key, UV value) {
+        resetNamespace();
+        return state.asyncPut(key, value);
+    }
+
+    @Override
+    public StateFuture<Void> asyncPutAll(Map<UK, UV> map) {
+        resetNamespace();
+        return state.asyncPutAll(map);
+    }
+
+    @Override
+    public StateFuture<Void> asyncRemove(UK key) {
+        resetNamespace();
+        return state.asyncRemove(key);
+    }
+
+    @Override
+    public StateFuture<Boolean> asyncContains(UK key) {
+        resetNamespace();
+        return state.asyncContains(key);
+    }
+
+    @Override
+    public StateFuture<StateIterator<Entry<UK, UV>>> asyncEntries() {
+        resetNamespace();
+        return state.asyncEntries();
+    }
+
+    @Override
+    public StateFuture<StateIterator<UK>> asyncKeys() {
+        resetNamespace();
+        return state.asyncKeys();
+    }
+
+    @Override
+    public StateFuture<StateIterator<UV>> asyncValues() {
+        resetNamespace();
+        return state.asyncValues();
+    }
+
+    @Override
+    public StateFuture<Boolean> asyncIsEmpty() {
+        resetNamespace();
+        return state.asyncIsEmpty();
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear() {
+        resetNamespace();
+        return state.asyncClear();
+    }
+
+    @Override
+    public UV get(UK key) {
+        return state.get(key);
+    }
+
+    @Override
+    public void put(UK key, UV value) {
+        state.put(key, value);
+    }
+
+    @Override
+    public void putAll(Map<UK, UV> map) {
+        state.putAll(map);
+    }
+
+    @Override
+    public void remove(UK key) {
+        state.remove(key);
+    }
+
+    @Override
+    public boolean contains(UK key) {
+        return state.contains(key);
+    }
+
+    @Override
+    public Iterable<Entry<UK, UV>> entries() {
+        return state.entries();
+    }
+
+    @Override
+    public Iterable<UK> keys() {
+        return state.keys();
+    }
+
+    @Override
+    public Iterable<UV> values() {
+        return state.values();
+    }
+
+    @Override
+    public Iterator<Entry<UK, UV>> iterator() {
+        return state.iterator();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return state.isEmpty();
+    }
+
+    @Override
+    public void clear() {
+        state.clear();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ReducingStateWithDeclaredNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ReducingStateWithDeclaredNamespace.java
new file mode 100644
index 000000000000..e9ab82eb7ffc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ReducingStateWithDeclaredNamespace.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.declare.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.v2.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/** Reducing state wrapped with declared namespace. */
+@Internal
+class ReducingStateWithDeclaredNamespace<K, N, T> extends 
StateWithDeclaredNamespace<K, N, T>
+        implements InternalReducingState<K, N, T> {
+    private final InternalReducingState<K, N, T> state;
+
+    public ReducingStateWithDeclaredNamespace(
+            InternalReducingState<K, N, T> state, DeclaredVariable<N> 
declaredNamespace) {
+        super(state, declaredNamespace);
+        this.state = state;
+    }
+
+    @Override
+    public StateFuture<T> asyncGet() {
+        resetNamespace();
+        return state.asyncGet();
+    }
+
+    @Override
+    public StateFuture<Void> asyncAdd(T value) {
+        resetNamespace();
+        return state.asyncAdd(value);
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear() {
+        resetNamespace();
+        return state.asyncClear();
+    }
+
+    @Override
+    public StateFuture<T> asyncGetInternal() {
+        resetNamespace();
+        return state.asyncGetInternal();
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdateInternal(T valueToStore) {
+        resetNamespace();
+        return state.asyncUpdateInternal(valueToStore);
+    }
+
+    @Override
+    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> 
sources) {
+        resetNamespace();
+        return state.asyncMergeNamespaces(target, sources);
+    }
+
+    @Override
+    public T get() {
+        return state.get();
+    }
+
+    @Override
+    public void add(T value) {
+        state.add(value);
+    }
+
+    @Override
+    public void clear() {
+        state.clear();
+    }
+
+    @Override
+    public void mergeNamespaces(N target, Collection<N> sources) {
+        state.mergeNamespaces(target, sources);
+    }
+
+    @Override
+    public T getInternal() {
+        return state.getInternal();
+    }
+
+    @Override
+    public void updateInternal(T valueToStore) {
+        state.updateInternal(valueToStore);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/StateWithDeclaredNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/StateWithDeclaredNamespace.java
new file mode 100644
index 000000000000..969cc2297700
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/StateWithDeclaredNamespace.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.declare.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;
+import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.runtime.state.v2.internal.InternalMapState;
+import org.apache.flink.runtime.state.v2.internal.InternalReducingState;
+import org.apache.flink.runtime.state.v2.internal.InternalValueState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A partitioned state that wraps a declared namespace and hide the namespace 
switching from user.
+ * User will only use the state just like the public state APIs without any 
consideration on
+ * namespace.
+ *
+ * <p>This wrap class is useful in DataStream window operation, where 
namespace is managed by the
+ * operator and user function is free from namespace manipulation.
+ */
+@Internal
+public abstract class StateWithDeclaredNamespace<K, N, V> implements 
InternalKeyedState<K, N, V> {
+    @Nonnull private final InternalKeyedState<K, N, V> state;
+    @Nonnull private final DeclaredVariable<N> declaredNamespace;
+
+    public StateWithDeclaredNamespace(
+            @Nonnull InternalKeyedState<K, N, V> state,
+            @Nonnull DeclaredVariable<N> declaredNamespace) {
+        Preconditions.checkNotNull(state);
+        Preconditions.checkNotNull(declaredNamespace);
+
+        this.state = state;
+        this.declaredNamespace = declaredNamespace;
+    }
+
+    @Override
+    public void setCurrentNamespace(N namespace) {
+        declaredNamespace.set(namespace);
+        state.setCurrentNamespace(namespace);
+    }
+
+    /** Automatically called before any async state access. */
+    protected void resetNamespace() {
+        state.setCurrentNamespace(declaredNamespace.get());
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <N, S extends State> S create(S state, DeclaredVariable<N> 
declaredNamespace) {
+        if (state instanceof InternalReducingState) {
+            return (S)
+                    new ReducingStateWithDeclaredNamespace<>(
+                            (InternalReducingState<?, N, ?>) state, 
declaredNamespace);
+        } else if (state instanceof InternalAggregatingState) {
+            return (S)
+                    new AggregatingStateWithDeclaredNamespace<>(
+                            (InternalAggregatingState<?, N, ?, ?, ?>) state, 
declaredNamespace);
+        } else if (state instanceof InternalValueState) {
+            return (S)
+                    new ValueStateWithDeclaredNamespace<>(
+                            (InternalValueState<?, N, ?>) state, 
declaredNamespace);
+        } else if (state instanceof InternalMapState) {
+            return (S)
+                    new MapStateWithDeclaredNamespace<>(
+                            (InternalMapState<?, N, ?, ?>) state, 
declaredNamespace);
+        } else if (state instanceof InternalListState) {
+            return (S)
+                    new ListStateWithDeclaredNamespace<>(
+                            (InternalListState<?, N, ?>) state, 
declaredNamespace);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported state type: " + state.getClass().getName());
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ValueStateWithDeclaredNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ValueStateWithDeclaredNamespace.java
new file mode 100644
index 000000000000..06545d818f3a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/state/ValueStateWithDeclaredNamespace.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.declare.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.v2.internal.InternalValueState;
+
+/** Value state wrapped with declared namespace. */
+@Internal
+class ValueStateWithDeclaredNamespace<K, N, V> extends 
StateWithDeclaredNamespace<K, N, V>
+        implements InternalValueState<K, N, V> {
+    private final InternalValueState<K, N, V> state;
+
+    public ValueStateWithDeclaredNamespace(
+            InternalValueState<K, N, V> state, DeclaredVariable<N> 
declaredNamespace) {
+        super(state, declaredNamespace);
+        this.state = state;
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear() {
+        resetNamespace();
+        return state.asyncClear();
+    }
+
+    @Override
+    public StateFuture<V> asyncValue() {
+        resetNamespace();
+        return state.asyncValue();
+    }
+
+    @Override
+    public StateFuture<Void> asyncUpdate(V value) {
+        resetNamespace();
+        return state.asyncUpdate(value);
+    }
+
+    @Override
+    public void clear() {
+        state.clear();
+    }
+
+    @Override
+    public V value() {
+        return state.value();
+    }
+
+    @Override
+    public void update(V value) {
+        state.update(value);
+    }
+}

Reply via email to