virajjasani commented on code in PR #1902:
URL: https://github.com/apache/phoenix/pull/1902#discussion_r1631764414


##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java:
##########
@@ -21,224 +21,148 @@
 import java.io.InterruptedIOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * 
- * Class, copied for the most part from HRegion.getRowLockInternal 
implementation
- * that manages reentrant row locks based on the row key. Phoenix needs to 
manage
- * it's own locking due to secondary indexes needing a consistent snapshot from
+ * Manages reentrant row locks based on row keys. Phoenix needs to manage
+ * its own locking due to secondary indexes needing a consistent snapshot from
  * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
  *
  */
 public class LockManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LockManager.class);
 
-    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> 
lockedRows =
-            new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockImpl> lockedRows 
=
+            new ConcurrentHashMap<>();
 
     public LockManager () {
     }
 
     /**
      * Lock the row or throw otherwise
-     * @param rowKey the row key
-     * @return RowLock used to eventually release the lock 
+     * @param rowKey
+     * @param waitDuration
+     * @return
      * @throws TimeoutIOException if the lock could not be acquired within the
      * allowed rowLockWaitDuration and InterruptedException if interrupted 
while
      * waiting to acquire lock.
      */
     public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws 
IOException {
-        RowLockContext rowLockContext = null;
-        RowLockImpl result = null;
-        TraceScope traceScope = null;
-
-        // If we're tracing start a span to show how long this took.
-        if (Trace.isTracing()) {
-            traceScope = Trace.startSpan("LockManager.getRowLock");
-            traceScope.getSpan().addTimelineAnnotation("Getting a lock");
-        }
-
-        boolean success = false;
-        try {
-            // Keep trying until we have a lock or error out.
-            // TODO: do we need to add a time component here?
-            while (result == null) {
-
-                // Try adding a RowLockContext to the lockedRows.
-                // If we can add it then there's no other transactions 
currently running.
-                rowLockContext = new RowLockContext(rowKey);
-                RowLockContext existingContext = 
lockedRows.putIfAbsent(rowKey, rowLockContext);
-
-                // if there was a running transaction then there's already a 
context.
-                if (existingContext != null) {
-                    rowLockContext = existingContext;
-                }
-
-                result = rowLockContext.newRowLock();
+        RowLockImpl rowLock = new RowLockImpl(rowKey);
+        while (true) {
+            RowLockImpl existingRowLock = lockedRows.putIfAbsent(rowKey, 
rowLock);
+            if (existingRowLock == null) {
+                // The row was not locked
+                return rowLock;
             }
-            if (!result.getLock().tryLock(waitDuration, 
TimeUnit.MILLISECONDS)) {
-                if (traceScope != null) {
-                    traceScope.getSpan().addTimelineAnnotation("Failed to get 
row lock");
-                }
-                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
-            }
-            rowLockContext.setThreadName(Thread.currentThread().getName());
-            success = true;
-            return result;
-        } catch (InterruptedException ie) {
-            LOGGER.warn("Thread interrupted waiting for lock on row: " + 
rowKey);
-            InterruptedIOException iie = new InterruptedIOException();
-            iie.initCause(ie);
-            if (traceScope != null) {
-                traceScope.getSpan().addTimelineAnnotation("Interrupted 
exception getting row lock");
+            // The row is already locked by a different thread. Wait for the 
lock to be released
+            // for waitDuration time
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            RowLockImpl usableRowLock = existingRowLock.lock(waitDuration);
+            if (usableRowLock != null) {
+                return usableRowLock;
             }
-            Thread.currentThread().interrupt();
-            throw iie;
-        } finally {
-            // On failure, clean up the counts just in case this was the thing 
keeping the context alive.
-            if (!success && rowLockContext != null) rowLockContext.cleanUp();
-            if (traceScope != null) {
-                traceScope.close();
+            // The existing lock was released and removed from the hash map 
before the current
+            // thread attempt to lock
+            long now = EnvironmentEdgeManager.currentTimeMillis();
+            long timePassed = now - startTime;
+            if (timePassed > waitDuration) {
+                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
             }
+            waitDuration -= timePassed;

Review Comment:
   nit: `waitDuration -= (int) timePassed`



##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java:
##########
@@ -21,224 +21,148 @@
 import java.io.InterruptedIOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * 
- * Class, copied for the most part from HRegion.getRowLockInternal 
implementation
- * that manages reentrant row locks based on the row key. Phoenix needs to 
manage
- * it's own locking due to secondary indexes needing a consistent snapshot from
+ * Manages reentrant row locks based on row keys. Phoenix needs to manage
+ * its own locking due to secondary indexes needing a consistent snapshot from
  * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
  *
  */
 public class LockManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LockManager.class);
 
-    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> 
lockedRows =
-            new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockImpl> lockedRows 
=
+            new ConcurrentHashMap<>();
 
     public LockManager () {
     }
 
     /**
      * Lock the row or throw otherwise
-     * @param rowKey the row key
-     * @return RowLock used to eventually release the lock 
+     * @param rowKey
+     * @param waitDuration
+     * @return
      * @throws TimeoutIOException if the lock could not be acquired within the
      * allowed rowLockWaitDuration and InterruptedException if interrupted 
while
      * waiting to acquire lock.
      */
     public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws 
IOException {
-        RowLockContext rowLockContext = null;
-        RowLockImpl result = null;
-        TraceScope traceScope = null;
-
-        // If we're tracing start a span to show how long this took.
-        if (Trace.isTracing()) {
-            traceScope = Trace.startSpan("LockManager.getRowLock");
-            traceScope.getSpan().addTimelineAnnotation("Getting a lock");
-        }
-
-        boolean success = false;
-        try {
-            // Keep trying until we have a lock or error out.
-            // TODO: do we need to add a time component here?
-            while (result == null) {
-
-                // Try adding a RowLockContext to the lockedRows.
-                // If we can add it then there's no other transactions 
currently running.
-                rowLockContext = new RowLockContext(rowKey);
-                RowLockContext existingContext = 
lockedRows.putIfAbsent(rowKey, rowLockContext);
-
-                // if there was a running transaction then there's already a 
context.
-                if (existingContext != null) {
-                    rowLockContext = existingContext;
-                }
-
-                result = rowLockContext.newRowLock();
+        RowLockImpl rowLock = new RowLockImpl(rowKey);
+        while (true) {
+            RowLockImpl existingRowLock = lockedRows.putIfAbsent(rowKey, 
rowLock);
+            if (existingRowLock == null) {
+                // The row was not locked
+                return rowLock;
             }
-            if (!result.getLock().tryLock(waitDuration, 
TimeUnit.MILLISECONDS)) {
-                if (traceScope != null) {
-                    traceScope.getSpan().addTimelineAnnotation("Failed to get 
row lock");
-                }
-                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
-            }
-            rowLockContext.setThreadName(Thread.currentThread().getName());
-            success = true;
-            return result;
-        } catch (InterruptedException ie) {
-            LOGGER.warn("Thread interrupted waiting for lock on row: " + 
rowKey);
-            InterruptedIOException iie = new InterruptedIOException();
-            iie.initCause(ie);
-            if (traceScope != null) {
-                traceScope.getSpan().addTimelineAnnotation("Interrupted 
exception getting row lock");
+            // The row is already locked by a different thread. Wait for the 
lock to be released
+            // for waitDuration time
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            RowLockImpl usableRowLock = existingRowLock.lock(waitDuration);
+            if (usableRowLock != null) {
+                return usableRowLock;
             }
-            Thread.currentThread().interrupt();
-            throw iie;
-        } finally {
-            // On failure, clean up the counts just in case this was the thing 
keeping the context alive.
-            if (!success && rowLockContext != null) rowLockContext.cleanUp();
-            if (traceScope != null) {
-                traceScope.close();
+            // The existing lock was released and removed from the hash map 
before the current
+            // thread attempt to lock
+            long now = EnvironmentEdgeManager.currentTimeMillis();
+            long timePassed = now - startTime;
+            if (timePassed > waitDuration) {
+                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
             }
+            waitDuration -= timePassed;
         }
     }
 
     public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
-        // create an object to use a a key in the row lock map
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
         return lockRow(rowKey, waitDuration);
     }
 
     /**
-     * Unlock the row. We need this stateless way of unlocking because
-     * we have no means of passing the RowLock instances between
-     * coprocessor calls (see HBASE-18482). Once we have that, we
-     * can have the caller collect RowLock instances and free when
-     * needed.
-     * @param row the row key
-     * @throws IOException
+     * Class used to represent a lock on a row.
      */
-    public void unlockRow(byte[] row) throws IOException {
-        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
-        RowLockContext lockContext = lockedRows.get(rowKey);
-        if (lockContext != null) {
-            lockContext.releaseRowLock();
-        }
-    }
-
-    class RowLockContext {
+    public class RowLockImpl implements RowLock {
         private final ImmutableBytesPtr rowKey;
-        // TODO: consider making this non atomic. It's only saving one
-        // synchronization in the case of cleanup() when more than one
-        // thread is holding on to the lock.
-        private final AtomicInteger count = new AtomicInteger(0);
-        private final ReentrantLock reentrantLock = new ReentrantLock(true);
-        // TODO: remove once we can pass List<RowLock> as needed through
-        // coprocessor calls.
-        private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED;
+        private int count = 1;
+        private boolean usable = true;
+        private final ReentrantLock lock = new ReentrantLock(true);
         private String threadName;
 
-        RowLockContext(ImmutableBytesPtr rowKey) {
+        private RowLockImpl(ImmutableBytesPtr rowKey) {
             this.rowKey = rowKey;
+            lock.lock();
+            threadName = Thread.currentThread().getName();
         }
 
-        RowLockImpl newRowLock() {
-            count.incrementAndGet();
+        RowLockImpl lock(long waitDuration) throws IOException{

Review Comment:
   nit: `waitDurationMs` to indicate the unit is milliseconds?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java:
##########
@@ -21,224 +21,148 @@
 import java.io.InterruptedIOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * 
- * Class, copied for the most part from HRegion.getRowLockInternal 
implementation
- * that manages reentrant row locks based on the row key. Phoenix needs to 
manage
- * it's own locking due to secondary indexes needing a consistent snapshot from
+ * Manages reentrant row locks based on row keys. Phoenix needs to manage
+ * its own locking due to secondary indexes needing a consistent snapshot from
  * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
  *
  */
 public class LockManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LockManager.class);
 
-    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> 
lockedRows =
-            new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockImpl> lockedRows 
=
+            new ConcurrentHashMap<>();
 
     public LockManager () {
     }
 
     /**
      * Lock the row or throw otherwise
-     * @param rowKey the row key
-     * @return RowLock used to eventually release the lock 
+     * @param rowKey
+     * @param waitDuration
+     * @return
      * @throws TimeoutIOException if the lock could not be acquired within the
      * allowed rowLockWaitDuration and InterruptedException if interrupted 
while
      * waiting to acquire lock.
      */
     public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws 
IOException {
-        RowLockContext rowLockContext = null;
-        RowLockImpl result = null;
-        TraceScope traceScope = null;
-
-        // If we're tracing start a span to show how long this took.
-        if (Trace.isTracing()) {
-            traceScope = Trace.startSpan("LockManager.getRowLock");
-            traceScope.getSpan().addTimelineAnnotation("Getting a lock");
-        }
-
-        boolean success = false;
-        try {
-            // Keep trying until we have a lock or error out.
-            // TODO: do we need to add a time component here?
-            while (result == null) {
-
-                // Try adding a RowLockContext to the lockedRows.
-                // If we can add it then there's no other transactions 
currently running.
-                rowLockContext = new RowLockContext(rowKey);
-                RowLockContext existingContext = 
lockedRows.putIfAbsent(rowKey, rowLockContext);
-
-                // if there was a running transaction then there's already a 
context.
-                if (existingContext != null) {
-                    rowLockContext = existingContext;
-                }
-
-                result = rowLockContext.newRowLock();
+        RowLockImpl rowLock = new RowLockImpl(rowKey);
+        while (true) {
+            RowLockImpl existingRowLock = lockedRows.putIfAbsent(rowKey, 
rowLock);
+            if (existingRowLock == null) {
+                // The row was not locked
+                return rowLock;
             }
-            if (!result.getLock().tryLock(waitDuration, 
TimeUnit.MILLISECONDS)) {
-                if (traceScope != null) {
-                    traceScope.getSpan().addTimelineAnnotation("Failed to get 
row lock");
-                }
-                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
-            }
-            rowLockContext.setThreadName(Thread.currentThread().getName());
-            success = true;
-            return result;
-        } catch (InterruptedException ie) {
-            LOGGER.warn("Thread interrupted waiting for lock on row: " + 
rowKey);
-            InterruptedIOException iie = new InterruptedIOException();
-            iie.initCause(ie);
-            if (traceScope != null) {
-                traceScope.getSpan().addTimelineAnnotation("Interrupted 
exception getting row lock");
+            // The row is already locked by a different thread. Wait for the 
lock to be released
+            // for waitDuration time
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            RowLockImpl usableRowLock = existingRowLock.lock(waitDuration);
+            if (usableRowLock != null) {
+                return usableRowLock;
             }
-            Thread.currentThread().interrupt();
-            throw iie;
-        } finally {
-            // On failure, clean up the counts just in case this was the thing 
keeping the context alive.
-            if (!success && rowLockContext != null) rowLockContext.cleanUp();
-            if (traceScope != null) {
-                traceScope.close();
+            // The existing lock was released and removed from the hash map 
before the current
+            // thread attempt to lock
+            long now = EnvironmentEdgeManager.currentTimeMillis();
+            long timePassed = now - startTime;
+            if (timePassed > waitDuration) {
+                throw new TimeoutIOException("Timed out waiting for lock for 
row: " + rowKey);
             }
+            waitDuration -= timePassed;
         }
     }
 
     public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
-        // create an object to use a a key in the row lock map
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
         return lockRow(rowKey, waitDuration);
     }
 
     /**
-     * Unlock the row. We need this stateless way of unlocking because
-     * we have no means of passing the RowLock instances between
-     * coprocessor calls (see HBASE-18482). Once we have that, we
-     * can have the caller collect RowLock instances and free when
-     * needed.
-     * @param row the row key
-     * @throws IOException
+     * Class used to represent a lock on a row.
      */
-    public void unlockRow(byte[] row) throws IOException {
-        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
-        RowLockContext lockContext = lockedRows.get(rowKey);
-        if (lockContext != null) {
-            lockContext.releaseRowLock();
-        }
-    }
-
-    class RowLockContext {
+    public class RowLockImpl implements RowLock {
         private final ImmutableBytesPtr rowKey;
-        // TODO: consider making this non atomic. It's only saving one
-        // synchronization in the case of cleanup() when more than one
-        // thread is holding on to the lock.
-        private final AtomicInteger count = new AtomicInteger(0);
-        private final ReentrantLock reentrantLock = new ReentrantLock(true);
-        // TODO: remove once we can pass List<RowLock> as needed through
-        // coprocessor calls.
-        private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED;
+        private int count = 1;
+        private boolean usable = true;
+        private final ReentrantLock lock = new ReentrantLock(true);
         private String threadName;
 
-        RowLockContext(ImmutableBytesPtr rowKey) {
+        private RowLockImpl(ImmutableBytesPtr rowKey) {
             this.rowKey = rowKey;
+            lock.lock();
+            threadName = Thread.currentThread().getName();
         }
 
-        RowLockImpl newRowLock() {
-            count.incrementAndGet();
+        RowLockImpl lock(long waitDuration) throws IOException{
             synchronized (this) {
-                if (rowLock != null) {
-                    rowLock = new RowLockImpl(this, reentrantLock);
-                    return rowLock;
-                } else {
+                if (!usable) {
                     return null;
                 }
+                count++;
             }
-        }
-
-        void releaseRowLock() {
-            synchronized (this) {
-                if (rowLock != null) {
-                    rowLock.release();
+            boolean success = false;
+            threadName = Thread.currentThread().getName();
+            try {
+                if (!lock.tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
+                    throw new TimeoutIOException("Timed out waiting for lock 
for row: " + rowKey);
                 }
-            }
-        }
-        
-        void cleanUp() {
-            long c = count.decrementAndGet();
-            if (c <= 0) {
-                synchronized (this) {
-                    if (count.get() <= 0 && rowLock != null){
-                        rowLock = null;
-                        RowLockContext removed = lockedRows.remove(rowKey);
-                        assert removed == this: "we should never remove a 
different context";
+                success = true;
+            } catch (InterruptedException ie) {
+                LOGGER.warn("Thread interrupted waiting for lock on row: " + 
rowKey);
+                InterruptedIOException iie = new InterruptedIOException();
+                iie.initCause(ie);
+                Thread.currentThread().interrupt();
+                throw iie;
+            } finally {
+                if (!success) {
+                    synchronized (this) {
+                        count--;
                     }
                 }
             }
-        }
-
-        void setThreadName(String threadName) {
-            this.threadName = threadName;
-        }
-
-        @Override
-        public String toString() {
-            return "RowLockContext{" +
-                    "row=" + rowKey +
-                    ", readWriteLock=" + reentrantLock +
-                    ", count=" + count +
-                    ", threadName=" + threadName +
-                    '}';
-        }
-    }
-
-    /**
-     * Class used to represent a lock on a row.
-     */
-    public static class RowLockImpl implements RowLock {
-        static final RowLockImpl UNINITIALIZED = new RowLockImpl();
-        private final RowLockContext context;
-        private final Lock lock;
-
-        private RowLockImpl() {
-            context = null;
-            lock = null;
-        }
-        
-        RowLockImpl(RowLockContext context, Lock lock) {
-            this.context = context;
-            this.lock = lock;
-        }
-
-        Lock getLock() {
-            return lock;
+            if (success) {
+                return this;
+            }
+            return null;

Review Comment:
   Since `InterruptedException` is being re-thrown, `success` here will always 
be true so we will never return null here. We can directly return `this` 
instead of checking for `success` value.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/LockManager.java:
##########
@@ -21,224 +21,148 @@
 import java.io.InterruptedIOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * 
- * Class, copied for the most part from HRegion.getRowLockInternal 
implementation
- * that manages reentrant row locks based on the row key. Phoenix needs to 
manage
- * it's own locking due to secondary indexes needing a consistent snapshot from
+ * Manages reentrant row locks based on row keys. Phoenix needs to manage
+ * its own locking due to secondary indexes needing a consistent snapshot from
  * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
  *
  */
 public class LockManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LockManager.class);
 
-    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> 
lockedRows =
-            new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockImpl> lockedRows 
=
+            new ConcurrentHashMap<>();
 
     public LockManager () {
     }
 
     /**
      * Lock the row or throw otherwise
-     * @param rowKey the row key
-     * @return RowLock used to eventually release the lock 

Review Comment:
   nit: shall we keep `@return` with description as is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to